Краткая заметка по теме business process mining в контексте роста интереса к концепции "digital twin". В силу периодического выплывания этой темы считаю целесообразным поделиться подходами к решению.
Постановка задачи
Ситуация предельно проста.
- Есть компания X (Y, Z, ...).
- В компании есть бизнес-процессы, автоматизированные различными ИТ системами.
- Есть бизнес-аналитики, которые нарисовали bpmn диаграммы по этим процессам. Если говорить точнее, их собственное "bpmn представление" о том, как эти процессы должны были бы выглядеть.
- Бизнес пользователи хотят иметь какое-то представление (KPI) по этим процессам.
Как докопаться до истины и посчитать эти метрики?
Является продолжением предыдущих публикаций.
Формулируем задачу в понятных для компьютера требованиях
Базисные постулаты:
- Есть временнОй лог событий (это могут разнообразные логи ИТ систем, cdr\xdr, просто записи событий в БД) разной степени чистоты, полноты и согласованности.
- ИТ системы действуют как конечный автомат и "гуляют" между различными состояниями в соответствии с действиями пользователей и бизнес-логикой, заложенной программистами в них.
- Взаимодействие пользователей осуществляется в транзакционном виде.
Корректировки физического мира:
- Количество внесенных изменений в ИТ системы таково, что bpmn диаграммы бизнес-аналитиков не имеют почти ничего общего с реальностью.
- Данные могут быть сильно неструктурированными (например, логи приложений).
- "Транзакционность" является логическим понятием. Сами записи событий содержат только атрибуты, присущие этому состоянию и нет никакого сквозного идентификатора транзакции.
- Число записей в сутки составляет десятки-сотни-тысячи миллионов штук.
Решение "поставил-посчитал"
Для решения подобных задач необходимо:
- реконструировать транзакции;
- реконструировать реальные бизнес-процессы;
- провести расчеты;
- сформировать результаты в human-readable формате.
Можно начать искать вендорские решения и платить миллионы. Но у нас же в руках есть R. Он вполне прекрасно позволяет решить эту задачу. Краткие соображения ниже.
Вроде все просто и в R есть хороший согласованный набор пакетов bupaR. Но ложка дегтя присутствует и она отравляет все. Этот набор за приемлемое время справляется только с небольшим количеством событий (сотни тысяч — несколько миллионов).
Для больших объемов надо использовать иные подходы.
Добавляем скорости!
Эмулируем входной набор данных
Для демонстрации идей необходимо сформировать некий тестовый набор данных. Возьмем в качестве физического исходника мат.модели пример федеральной сети магазинов, благо это понятно абсолютно всем. Хотя с таким же успехом это могут быть банкоматы, колл-центры, общественный транспорт, водоснабжение и т.д.
- Есть магазины разного масштаба (маленькие, средние и большие).
- В магазинах есть кассы (pos терминалы).
- Номера магазинов могут быть цифробуквенными, номера терминалов — цифровые.
- В магазины ходят покупатели и делают покупки чего-нибудь и при этом оплачивают картой.
- Взаимодействие pos терминала с картой и банком описывается определенным набором состояний и правилами перехода между ними.
- Операции бывают успешные, неуспешные, отложенные и незавершенные (банк недоступен, например).
- Транзакции обладают таймаутами.
Берем следующий набор паттернов бизнес-операций:
"INIT-REQUEST-RESPONSE-SUCCESS"
"INIT-REQUEST-RESPONSE-ERROR"
"INIT-REQUEST-RESPONSE-DEFFERED"
"INIT-REQUEST"
"INIT"
Для демонстрации подхода создадим малый семпл, но это все прекрасно работает и на миллиардах записей (для такого объема без суперглубокой оптимизации характерное время измеряется всего сотнями секунд на одном сервере весьма посредственной производительности).
Сразу спойлеры для больших объемов:
- во многих местах средствами
tidyverse
можно ответа и не дождаться; - оптимизация даже микрошагов полезна и может дать значимый вклад.
library(tidyverse)
library(datapasta)
library(tictoc)
library(data.table)
library(stringi)
library(anytime)
library(rTRNG)
data.table::setDTthreads(0) # отдаем все ядра в распоряжение data.table
data.table::getDTthreads() # проверим доступное количество потоков
set.seed(46572)
RcppParallel::setThreadOptions(numThreads = parallel::detectCores() - 1)
# Важное допущение -- нет параллельных бизнес-транзакций, все выполняется строго последовательно
# Есть 5 типа паттернов бизнес-операций, 2 последних -- глобальные сбои
bo_pattern <- tibble::tribble(
# маркеры паттерна, частота в пуле операций, средняя длительность транзакции
~pattern, ~prob, ~mean_duration,
"INIT-REQUEST-RESPONSE-SUCCESS", 0.7, 5,
"INIT-REQUEST-RESPONSE-ERROR", 0.15, 5,
"INIT-REQUEST-RESPONSE-DEFFERED", 0.07, 8,
"INIT-REQUEST", 0.05, 2,
"INIT", 0.03, 0.5
)
# Проверка корректности данных + расчет абсолютных частот событий
checkmate::assertTRUE(sum(bo_pattern$prob) == 1)
df <- bo_pattern %>%
separate_rows(pattern) %>%
# нормировочный коэффициент
mutate(coeff = sum(prob)) %>%
group_by(pattern) %>%
# переведем в проценты
summarise(event_prob = sum(prob/coeff)*100) %>%
ungroup()
checkmate::assertTRUE(sum(df$event_prob) == 100)
# Пусть есть 3 типа магазинов: маленькие (4 кассы), средние (12 касс), большие (30 касс)
df1 <- tribble(
~type, ~n_pos, ~n_store,
"small", 4, 10,
"medium", 12, 5,
"large", 30, 2
) %>%
# генерируем номера магазинов для каждой группы
mutate(store = map2(row_number(), n_store,
~sample(x = .x * 1000 + 1:.y, size = .y, replace = FALSE))) %>%
unnest(store) %>%
# генерируем номера терминалов для каждого магазина
mutate(pos = map(n_pos, ~sample(x = .x, size = .x, replace = FALSE))) %>%
unnest(pos) %>%
mutate(pattern = sample(bo_pattern$pattern, n(), replace = TRUE, prob = bo_pattern$prob))
tic("Generate transactions")
# транзакции идут одна за другой, в параллель ничего не исполняется
# для упрощения задачи мы сгенерируем количество записей с запасом, а потом отсечем по временнЫм границам
df2 <- df1 %>%
# для каждой транзакции случайным образом сгенерируем общую продолжительность
select(-matches("duration")) %>%
left_join(bo_pattern, by = "pattern") %>%
# раздуем сэмплы
sample_frac(size = 200, replace = TRUE) %>%
mutate(duration = rnorm(n(), mean = mean_duration, sd = mean_duration * .25)) %>%
select(-prob, -mean_duration) %>%
# отбрасываем все транзакции, которые имеют отрицательную длительность или > таймаута
# таймаут устанавливаем равным 30 секунд
filter(duration > 0.5 & duration < 30) %>%
# теперь для каждого POS сгенерим поток событий на основе паттернов
mutate(session_id = row_number()) %>%
# расщепляем транзакции на отдельные состояния, при этом порядок следования состояний сохраняется
separate_rows(pattern) %>%
rename(event = pattern)
toc()
tic("Generate time markers, data.table way")
samples_tbl <- data.table::as.data.table(df2) %>%
# setkey(session_id, duration, physical = FALSE) %>%
# на каждую транзакцию надо навесить времена между каждой отдельным состоянием
# 1-ая операция тоже имеет дельту от конца предыдущей, полагаем, что она будет не менее 5 секунд
# .[, ticks := base::sort(runif(.N, 5, 5 + duration)), by = .(session_id, duration)] %>%
# при малом объеме данных очень большие накладные расходы на match.arg у функции base::order!!
# делаем в два захода
# сначала просто генерируем случайные числа от 0 до 1 для каждой записи отдельно
# и масштабируем одним вектором
# .[, tshift := runif(.N, 0, 1)] %>%
# в таком расладе trng быстрее в несколько раз (может даже на порядок получиться)
# фактически, здесь мы считаем случайные соотношения времен между операциями внутри транзакции
.[, trand := runif_trng(.N, 0, 1, parallelGrain = 100L) * duration] %>%
# делаем сортировку вектора внутри каждой сессии, простая сортировка будет ОЧЕНЬ долгой
# .[, ticks := sort(tshift), by = .(session_id)] %>%
# делаем трюк, формируем составной индекс из session_id, который является монотонным, и смещением по времени
.[, t_idx := session_id + trand / max(trand)/10] %>%
# подтягиваем в колонку сортированное значение вектора с реконструкцией
# session_id в целой части гарантирует сортировку пропорций в рамках каждой транзакции без доп. группировок
.[, tshift := (sort(t_idx) - session_id) * 10 * max(trand)] %>%
# добавим дополнительной реалистичности, между транзакциями на одном POS должны быть большие паузы (60 сек)
.[event == "INIT", tshift := tshift + runif_trng(.N, 0, 60, parallelGrain = 100L)] %>%
# удаляем весь промежуточный мусор
.[, `:=`(duration = NULL, trand = NULL, t_idx = NULL,
n_store = NULL, n_pos = NULL,
timestamp = as.numeric(anytime("2019-03-11 08:00:00 MSK")))] %>%
# переводим все в физическое время, начиная от 01.03.2019 для каждой кассы отдельно
.[, timestamp := timestamp + cumsum(tshift), by = .(store, pos)] %>%
# фильтруем по текущему рабочему дню
.[timestamp <= as.numeric(anytime("2019-04-11 23:00:00 MSK")), ] %>%
# и обратно одним действием преобразуем числа во время для вектора
.[, timestamp := anytime(timestamp, tz = "Europe/Moscow")] %>%
as_tibble() %>%
select(store, pos, event, timestamp, session_id)
toc()
Для чистоты эксперимента оставим только значащие параметры и все перемешаем. В реальной жизни надо еще случайным образом выкинуть часть фрагментов (возможно отдельными временными блоками), эмулируя тем самым потери в получении данных.
# перемешиваем данные
log_tbl <- samples_tbl %>%
select(store, pos, state = event, timestamp_msk = timestamp) %>%
sample_n(n())
# посмотрим графически
log_tbl %>%
mutate(timegroup = lubridate::ceiling_date(timestamp_msk, unit = "10 mins")) %>%
ggplot(aes(timegroup)) +
# geom_bar(width = 0.7*600) +
geom_bar(colour = "white", size = 1.3) +
theme_bw()
Проиллюстрируем картинкой схему процесса
и распределение по состояниям
Незначительные флуктуации обусловлены тем, что таблица считается вначале (есть во включенном коде), а bupaR::process_map
работал в конце, когда часть случайно сгенерированных данных, не подходящих под интегральные ограничения, была срезана элементами фильтрации.
Реконструкция транзакций
Первое, что обычно предлагают, когда приходится собирать\разбирать\сопоставлять временные ряды, это группировки и циклы сравнений. В демонстрационных примерах на 100 записей этот поход сработает, но миллионных списках — нет. Чтобы справиться с этой задачей надо локализовать точки потери времени (внутренние циклы, промежуточные выделения памяти и копирование) и постараться устранить их до минимума.
В итоге эту задачу можно свести к десятку строчек.
clean_dt <- as.data.table(log_tbl) %>%
# все транзакции начинаются с INIT
.[, start := (state == "INIT")] %>%
# синтетический session_id будем строить в рамках одного дня, поэтому
# для кардинального сокращения времени преобразования перевод даты в строку сделаем по группам
.[, event_date := lubridate::as_date(timestamp_msk)] %>%
.[, date_str := format(.BY[[1]], "%y%m%d"), by = event_date] %>%
# временнУю сортировку от прошлого к будущему сделаем один раз для всех
# timestamp_msk уже содержит миллисекунды
setorder(store, pos, timestamp_msk) %>%
# есть еще одна беда -- разрыв в поступлении данных может приводить к искажению правильного шаблона путем приклейки огрызков
.[, session_id := paste(date_str, store, pos, cumsum(start), sep = "_")] %>%
# попробуем подкорректировать путем введения разрешенного транзакционного окна (возьмем 30 сек)
# .[, time_shift := timestamp_msk - shift(timestamp_msk), by = .(store, pos)] %>%
# в отсортированном списке событий протягиваем куммулятивный максимум времени, ориентируясь на INIT
.[, time_locf := cummax(as.numeric(timestamp_msk) * as.numeric(start)), by = .(store, pos)] %>%
.[, time_shift := as.numeric(timestamp_msk) - time_locf] %>%
# маркируем транзакционное окно, при генерации мы его установили равным 30 секунд
.[, lost_chain := time_shift > 30] %>%
# .[, time_shift := as.numeric(!start) * as.numeric(timestamp_msk - shift(timestamp_msk, fill = 0))] %>%
# INIT зануляет куммулятивную дельту
# .[, time_accu := cumsum(time_shift)] %>%
.[, date_str := NULL]
# агрегируем и потом считаем объемные показатели на уровне транзакций
# для больших объемов tidyverse не годится, дождаться конца исполнения не получилось
dt <- as.data.table(clean_dt) %>%
# исключаем огрызки из транзакционного анализа!!!
.[lost_chain != TRUE] %>%
# делаем глобальную сортировку для прямой отсылки к 1-му и последнему элементу
.[order(timestamp_msk, store, pos)] %>%
.[, bp_pattern := stri_join(state, collapse = "-"), by = session_id]
# проверим статистику по паттернам
as_tibble(dt) %>%
distinct(session_id, bp_pattern) %>%
count(session_id, sort = TRUE)
Через несколько секунд имеем реконструированную картину бизнес-процессов.
И (кто бы мог подумать!!!) по факту оказывается, что автоматизированные в ИТ системах бизнес-процессы работают несколько не так (или совсем не так), как всех убеждали бизнес-аналитики. Удивления и споры "владельцев процесса" будут сопровождать изучение финальной картинки.
Активно применяем трюки
Когда скорость вычисления становится важной величиной мало просто написать рабочий код. Необходимо обратить внимание на все уровни. Также есть ряд алгоритмических трюков, которые позволяют существенно сократить время выполнения.
В частности, в этой задаче можно упомянуть следующее:
- Для основного процессинга только
data.table
(скорость, работа по ссылкам), + учет внутренней оптимизации запросов. POSIXct
может содержать миллисекунды (хоть штатно и не отображается, но можно подправить с помощьюoptions(digits.secs=X)
), прячем их туда, проще будет сравнивать и сортировать.- Избегаем физической сортировки внутри групп!.. Однократная физическая сортировка всего вектора гарантирует сортировку данных в группировках.
- Избегаем вычислений внутри групп. Все что можно, стараемся выполнить на исходных данных (применяем векторизацию, снижаем накладные на вызовы функций).
- Используем таймаут на транзакции, чтобы бороться с временными разрывами.
- Методы locf (Last Observation Carried Forward) работают медленно. Для переноса свойств по шкале времени используем
cumsum
,cummax
. - Трудоемкие операции, такие как преобразование POSIX -> строка, поиск по регуляркам и т.п. делаем не поэлементно, а на свертках. Накладные на внтреннюю индексацию и группировку преобразуемого поля неспоставимо меньше.
- Активно используем многопоточность (в т.ч. внутрипакетную).
- Не пренебрегаем микрооптимизацией. Например,
stri_c
в несколько раз быстрееpaste0
.
# Тест 1
log <- getLog(fileName)
bench::mark(
paste0 = paste0(log$value, collapse = "\n"),
stringi = stri_c(log$value, collapse = "\n")
)
# # A tibble: 2 x 13
# expression min median `itr/sec` mem_alloc `gc/sec` n_itr n_gc total_time
# <bch:expr> <bch:> <bch:> <dbl> <bch:byt> <dbl> <int> <dbl> <bch:tm>
# 1 paste0 58ms 59.1ms 16.9 496KB 0 9 0 533ms
# 2 stringi 16.9ms 17.5ms 57.1 0B 0 29 0 508ms
Предыдущая публикация — Швейцарский нож для обработки json.
somurzakov
ETL в R выглядит очень сурово, как забивание гвоздей электронным микроскопом. не умаляя ни разу вашу проделнанную работу, мне кажется вы используете не тот инструмент для задачи.
Как мне кажется простой data warehouse, 1 простой ETL и пара SQL запросов могли бы резво и надежно подсчитать все это (на любых размерах данных).
алгоритм примерно такой:
1. вытаскиваем 1 день данных из OLTP базы в OLAP staging table в DWH используя SSIS
2. проставляем transaction_id согласно установленным правилам -> сначала для строк с законченной транзакцией, затем для выпавших на шаге N-1 и проставляем сразу статус транзакции
3. считаем аггрегации по измерениям и кладем их в куб
4. визуализируем куб в Qlik/PowerBI/Tableau
какие выгоды от этого процесса для компании:
1. ETL работает как часы по установленному графику и работает быстро т.к. использует ресурсы сервера
2. сам процесс может быть разбит на несколько частей (etl, sql, visualization) и распределен на нескольких людей для командной работы/масштабирования процесса
3. используются более старые и проверенные в ентерпрайзе технологии SQL, SSIS, DWH которые изобрел еще дядюшка Кимбалл в мохнатые 80-ые
4. специалисты по SQL дешевле чем спецы в R
i_shutov Автор
в целом конечно соглашусь, такие подходы тоже используются.
Кстати, в случае с анализом событий все несколько хуже. И данные неструктурированные (сплошной поток сознания) и события в совершенно разных базисах и не одним OLAP приходится довольствоваться. Конкретно здесь за кадром еще есть и потоковый препроцессинг и очереди и бэкенды и хадупы и прочий обвес.
Я просто из большого комплекса одну частную задачку выдернул, интересную саму по себе. Неряшливость и неразборчивость в применяемых пакетах, функциях и алгоритмах на малых данных выходит боком на уже на данных среднего размера. Что можно очень хорошо продемонстрировать и предложить обратить внимание на
data.table
, например. И поговорить про способы оптимизации и трюки, которые я указал в конце.Насчет хардкорности согласиться будет сложнее. Тут реально кода по анализу и восстановлению десяток строчек. Больше на судоку похоже, чем на квантовую механику. Навык эффективно использовать железо весьма полезен и для разработчиков и для аналитиков.
Сам же приведенный блок также используется в генерации Rmarkdown отчетов, в которых есть и картинки и таблички и масса различных алгоритмов по анализу данных и загрузка из других источников и выгрузка наружу (side-effect). Это несколько больше чем визуализация куба. Причем $0 за лицензии.
Я сознательно сфокусировался на одном малом шаге, потому как он хорош и для обучающих целей.
P.S. Кстати, а что значит "на любых размерах"? И сколько это стоит? И сколько будет исполняться.
Конкретно в таких задачах мы остановились на Clickhouse в качестве аналитического бэкенда. $0 за лицензии, несколько терабайт в хранилище (пока) при сжатии ~85% на грамотный запрос дают от долей секунд до сотен (уж что написать...).
А за развернутый комментарий отдельное спасибо.
somurzakov
я изложу тут свою пару мыслей, как менеджера по анализу данных в кровавом ентерпрайзе:
1. Как раз в вашем случае, когда разные базисы и полный зоопарк технологий и следует использовать Data Warehouse, как единый источник для аналитики. Именно в этом warehouse данные из сотен разных систем причесываются и приводятся к одному базису и проставляются внешние ключи к общим справочникам по всему ентерпрайзу (например бухгалтерия, учет кадров, производство, продажи и т.д.)
2. Почитайте любую книгу Ralph Kimball, это он патриарх и идейный лидер такого понятия как DWH и BI, там он затрагивает вопросы архитектуры — как в типичной огромной американской корпорации поставить на поток процесс анализа данных из зоопарка разных систем. Также он затрагивает вопросы контроля качества данных, когда данные нужно отбрасывать и возвращать отвественным людям на исправление, когда пропускать с пометкой, и т.д. — потому что любой анализ хорошо настолько, насколько точны и качественны данные. И тут важен не сам факт, что справились ли вы как аналитик с анализом и соединили точки воедино, а то, что должен быть выстроен процесс по контролю качества данных и улучшени. бизнес-процессов которые генерируют эти данные.
3. В ентерпрайзе обычно стоимость софта особого значения не имеет, и опенсорс не имеет преимуществ перед коммерческим софтом, зачастую наоборот. Потому, что в ентерпрайзе важно чтобы решение работало сразу из коробки, и сразу был доступ на основе Active Directory и интеграция с ITSM системой. Я например использую большой кластер Tableau Server, который стоит миллионы долларов, но зато там авторизация на основе Active Directory, движок данных Hyper который разработали PhD-шники из Стенфорда, готовый экспорт данных в PNG для презентации большим боссам и работу из iPad-а который у шефа который везде летает на корпоративном самолете и нету времени на встречу — вся коммуникация через комплекс дешбордов и мониторинг выставленных ключевых метрик по организации с 60+ тыс человек.
>> P.S. Кстати, а что значит «на любых размерах»? И сколько это стоит? И сколько будет исполняться.
бекенд обычный SQL Server 2008 на обычном сервере. Можно обойтись postgres при желании.
Под любыми размерами я имел в виду то, что терабайтные массивы данных аггрегируются в мегабайты аггрегаций.
Базовый принцип — давать человеку, принимающему решения, ровно столько данных, сколько нужно для принятия решения. Размер данных аггрегаций достаточных чтобы построить график изчисляется мегабайтами, и тут больше вопрос в качестве данных, нежели в технологии. Лучше потратить неделю на общение с бизнесом и с человеком, который будет смотреть на аналитику и принимать решения, чтобы итеративно построить то, что ему нужно — чем лопатить терабайты данных и забросать данными пользователя, чтобы у того голова вообще перестала работать от информационного шума.
i_shutov Автор
Понятно, наболело, видимо.
Но это все немного в стороне от исходной темы публикации, а именно, business process mining на R и трюки по сильной оптимизации. Это именно аналитическая задачка имеющая широкий спектр применения. Не было ни слова ни про саму глобальную задачу, DWH и ETL вообще не затрагивались и не обсуждались. Повторюсь, это всего-лишь маленький фрагмент из большого пазла. Кулисы я не планировал отдергивать.
В публикациях интересно делиться маленькими полезностями, чем бесконечными рассказами о том, как лопатами вагоны данных разгружали.
У меня тоже имеется ряд наблюдений.