Привет, Хабр, я Node.js разработчик, и я хочу поделиться с вами опытом по реализации business intelligence (BI) процесса. 

В какой-то момент бизнес вырос до размера, пусть и небольшого, когда считать различные цифры и проводить аналитику в excel таблицах уже сложно и медленно, да и количество людей работающих с данными значительно выросло. Тогда зашла речь об автоматизации этого процесса и визуализации различного рода аналитики. Так мы подошли к мысли, что пора реализовывать BI внутри компании.

BI - это просто набор механизмов, технологий для сбора, анализа и визуализации данных.

Немного о бизнесе

У нас есть заказы (orders), которые создаются в лабораториях. У этих заказов есть различные этапы (statuses) их обработки, бизнесу интересно понимать скорость обработки этих заказов, т.е. время перехода из одного статуса в другой, соотношение количества результатов заказов к общему количеству.

Выбор хранилища

В качестве главного хранилища мы используем не реляционную базу данных Cassandra. Для нашего BI она нам не подходит, т.к. она не предназначена для аналитики данных. У нее есть свой язык CQL (Cassandra Query Language), но он сильно урезан относительно SQL. к тому же мы используем такую схему данных, когда у нас в одной колонки хранится ID объекта, а в другой хранится JSON объекта. С такой схемой далеко в аналитику не уедешь. 

Т.к. у нас вся архитектура лежит в AWS, наш взор сразу пал на Redshift. Изучив ее особенности мы решили, что для наших задач ее функционала хватит более чем. Мы не занимались здесь глубокой аналитикой и сравнением Redshift с другими решениями, такими как ClickHouse, BigQuery, Azure.

ETL для Redshift

Разработчики заранее дают нам понять, что подход, когда мы в режиме реального времени пишем данные в БД с Redshift не прокатит. Лучший способ доставки данных в хранилище это копирование из S3 файлов в формате csv.

Этот способ невероятно быстрый за счет того, что позволяет обрабатывать файлы параллельно, но есть и некоторые ограничения. Файлов должно быть немного, желательно столько сколько у вас развернуто node Redshift, чтобы файлы обрабатывались параллельно и размер файлов должен быть от 1 - 125 МБ.

Подумав над всеми этими рекомендациями, мы поняли, что нам нужна реализация, которая будет писать данные в файлы, а затем заливать их в файловое хранилище AWS S3 и дальше перемещать уже в хранилище Redshift.

Первая серьезная проблема с которой мы столкнулись это то, что обновлять данные будет очень сложно и дорого как по времени, так и по ресурсам. Исходя из этого, мы поняли, что нам нужно записывать данные в разных состояниях, опираясь на какие-то события в системе, а уже по дате создания записи фильтровать их. Важно, что дата создания (timestamp) !== дате добавления строки в БД, т.к. мы пишем данные пачками.

Таким образом, каждая таблица у нас содержит данные об объекте в разный промежуток времени и выглядят они примерно так:

Сервис для подготовки данных (service-redshift)

Давайте реализуем сервис таким образом, чтобы он прослушивал все события, которые прилетают в брокер и решал, надо ли ему записать объект из события, взяв определенные поля, или на его основе сделать какую-то запись.

В наш брокер прилетают сообщения (events) в формате:

some_event: {
  method: POST,
  current_state: { object_type: order_status, … },
  previous_state: { - } 
},

another_some_event: {    
  method: PATCH,    
  current_state: { object_type: order, status: received, … },        
  previous_state: { object_type: order, status: created, … } 
},

Мы можем подключить наш сервис к брокеру и реагировать на события. Давайте напишем функцию, которая будет реагировать на метод PATCH у event на конкретный тип объекта requisition.

// index.js
const handlers = {
	"PATCH order": require("./order").onPatch,
}
const onHandleEvent = async (event) => {
	await handlers[`${event.method} ${event.object_type}`]();
}

Теперь опишем обработчик для объекта requisition

// order.js
const onPatch = async event => {
	...
}

В эту функцию у нас прилетает event с типом объекта requisition и его предыдущим и текущим состоянием. Теперь мы можем сформировать какой-нибудь объект из полученных данных и отправить его в промежуточное хранилище. У нас могут быть разные события на один и тот же объект, поэтому давайте реализуем функцию, которая будет формировать объект независимо от того какой метод отработал.

// collector_order.js
const onHandleOrder = event => {
	//здесь могут быть какие-то расчеты полей, но у этого объекта 
  //все будет просто
  return {
    id: event.id,
    status: event.status,
    created_at: event.created_at,
  };
};

// order.js
const onPatch = async event => {
  const order = onHandleOrder(event);
  
  await sendToTempStore(order);
};

Промежуточное хранилище

Определимся теперь, где нам промежуточно хранить данные перед их отправкой в S3. Можно было бы писать прямо в S3 файлы, но:

  1. В файлы нельзя проводить дописывание

  2. Если бы мы загружали туда по одному файлу, а потом собирали их и где-то агрегировали, тогда наш алгоритм работал бы медленно

Т.к. мысль с файлами нас еще не покинула, наш взор пал на технологию AWS EFS. Но прошерстив документацию, стало понятно, что возникнет проблема с чтением маленьких файлов, а записывать все в один будет не так быстро, как хотелось бы.

И тут мы наконец-то добрались до самого быстрого хранилища - оперативная память, а именно: Redis.

Соберем брокер, service_redshift, Redis и сам Redshift в одну единую схему:

Было решено, что данные будут ходить в Redshift раз в какой-то промежуток времени (Migration шаг в схеме). Значит, что за это время данные должны полежать в Redis, а во время боя курантов перебраться в файлы и отправиться в S3. 

В Redis нужно каким-то образом искать данные для отправки, не зная их ID. Лучший вариант был - это сложить все данные в Set, а затем найти нужный Set и перебрать его значения. Сами данные будут представлять из себя строчки, связанные запятой, так сказать, подготовились к формату csv.

Чтобы легче всего было найти нужные данные в Redis мы воспользовались маской для ключа вот такого вида: table_name-YYYY-MM-DD-HH. Таким образом, мы можем в назначенное время, зная имена конечных таблиц, найти все необходимые нам данные.

Адаптируем функцию обработки объекта по методу, добавив запись в Redis

// temp_store.js
const moment = require("moment");
const redis = require("./connected_redis");

const generateKeyName = (data, tableName) => {
	const date = moment().format('YYYY-MM-DD-HH');
  
  return `${tableName}-${date}`;
}

const sendToTempStore = async (order, tableName) => {
  const row = Object.values(order)
  	.push(new Date().toISOString()) // Это наша колонка timestamp, она всегда идет в конце
  	.join(",");
  const key = generateKeyName(row, tableName);
  
  // В случае отсутсвия Set, редис сам создаст его
  // Поэтому никаких дополнительных проверок тут не надо
  await redis.rPush(key, row);
}

// order.js
const onPatch = async event => {
  const order = onHandleOrder(event);
  
  await sendToTempStore(order, "orders");
};

В конечно итоге мы получаем Set с нашими объектами для определенной таблицы

Миграция в Redshift

Собрав данные в Redis, мы раз в период начинаем их оттуда извлекать и записывать в файлы, причем, не забывая о рекомендациях документации о размере и количестве файлов.

Алгоритм действия будет у нас выглядеть таким образом:

Берем все сеты, которые есть в Redis (мы выделили отдельный Redis под эти манипуляции, а значит лишнего не захватим). Перебираем все таблицы, создаем папку под каждую таблицу, туда мы будем складывать файлы для последующей миграции. Отбираем набор сетов для таблицы, убираем тот, который не равен текущему часу (почему так чуть подробнее ниже), остальные складываем в структуру данных Map.

Теперь берем каждую таблицу и все ее сеты. Так как в сете у нас может быть много записей, ограничим обработку до 100к, т.к. одна запись примерно 102 bytes, значит 100к ~ 10 MB. Таким образом контролируем память.  Если записей больше, чем 100к, разбиваем на чанки, записывая пограничные индексы. В конечном счете записываем данные в файлы по сетам.

После того, как мы собрали файлы из сетов, можем переносить это все в S3, а затем переносим это в Redshift, используя команду COPY.

Стоит отметить, что сервис не работает постоянно, он представляет из себя контейнер ECS, который запускается по расписанию, что как раз в 2-ом пункте позволяет нам не писать никуда никакие интервалы. Интервал контролируется временем запуска.

Визуализация данных

К визуализации данных мы уже подошли, сравнивая различные решения. Выбор стоял между двумя - Redash, Metabase.

Tableau отпало, т.к. инструмент слишком мощный и у нас просто нет необходимости в большой части его функционала.

Grafana рассматривалась, но вскоре мы решили, что для рядовых пользователей этот инструмент будет слишком тяжелым.

Есть две ключевые особенности, которые были главными аргументами в пользу одного из решений (Redash vs Metabase).

Redash имеет такую архитектуру, что при выполнение какого либо запроса, он не ждет ответа в браузере от своего сервера, который в свою очередь ждет ответа от базы данных. Вместо этого он создает Job, которая ждет выполнения запроса и с браузера клиента он проверяет ее статус. Когда Job завершилась, он берет результат и показывает в дашборде.

Metabase посылает запрос к своему сервису и ждет результата до последнего, если у вас стоит какой-то балансировщик или прокси сервер, который отбрасывает запросы, если они идут дольше 60 секунд, вы испытаете вот такую проблему.

Redash обладает еще одной особенностью, он позволяет выстраивать графики опираясь на один запрос, в то время как Metabase нужно иметь отдельный запрос на каждый график.

В ходе долгой дискуссии мы все же решили, что с точки зрения пользователя, гораздо удобнее будет использовать Metabase, не смотря на, с нашей точки зрения, техническое превосходство Redash.

С инструментом определились, осталось сформировать запросы, учитывая, что мы храним данные в разных состояниях.

Мы используем materialized view, чтобы не собирать постоянно данные с разных таблиц при помощи join, т.к. этот процесс на большом количестве данных и связей достаточно медленный. Оно представляет из себя таблицу, собранную из разных таблиц воедино. Эта таблица имеет возможность обновляться автоматически по мере обновления данных в таблицах, из которых она соткана. Если таблицы начинают обновляться друг за другом, materialized view обновиться лишь один раз, когда закончатся обновления всех таблиц. Сам запрос выглядит примерно так:

create view orders_mv
            (order_id, status, created_at)
as
CREATE MATERIALIZED VIEW orders_mv AUTO REFRESH YES
AS
(
with order_v as (
    SELECT o.order_id,
           o.status,
           o.created_at,
    FROM (SELECT o.order_id,
                 o.status,
                 o.created_at,
                 // Здесь мы как раз проводим фильтрацию, отбирая только последнее состояние заказа в БД
                 pg_catalog.row_number() OVER (PARTITION BY o.order_id ORDER BY o."timestamp" DESC) AS row_num
          FROM orders o
         ) o
    WHERE o.row_num = 1

Подытожим

У нас получилось собрать конструкцию из сервиса и скрипта, который запускается по расписанию, воссоздав таким образом свой процесс ETL.

У него есть один минус, который мы видим сразу же - чем больше данных (колонок) мы хотим видеть в момент их выставления у объектов, тем больше записей мы получим на выходе в Redshift. Например, у ордера мы хотим видеть 2 промежуточных состояния, когда выставится some_column_one и some_column_two, таким образом, мы получим две записи:

Но при текущем количестве данных (<10кк в час) нас вполне устраивает это решение, а когда мы доберемся до больших цифр, мы начнем свой переезд в AWS Kenesis. Думаю мы еще обязательно поделимся этим опытом с вами.

Комментарии (7)


  1. kienli
    03.06.2022 08:16

    Спасибо за статью! Как решаете вопрос с мониторингом такого пайплайна?


    1. Montgomeryg Автор
      03.06.2022 08:17

      Используем AWS CloudWatch и уведомления в Slack.


  1. SergeyUstinov
    03.06.2022 09:24

    Читая статью, постоянно вспоминал анекдот:

    Идет научная конференция. На ней присутствуют делегации из Америки, Англии
    и России. Выстпают американцы:
    - Мы можем заменить человеку палец.
    - Браво! Браво!
    Англичане:
    - А мы можем заменить человеку глаз.
    - Браво! Браво!
    Русские:
    - А мы гланды вырезаем.
    - У-у-у-у-у
    Проходит десять лет.
    Выступают американцы:
    - Мы можем заменить человеку руку.
    - Браво! Браво!
    Англичане:
    - Мы можем заменить человеу голову.
    - Браво! Браво!
    Русские:
    - А мы гланды вырезаем...
    - У-у-у-у-у
    - ...через жопу
    - Браво! Браво!
    Проходит еще десять лет.
    Выступают американцы:
    - Мы можем заменить человеку сердце.
    - Браво! Браво!
    Англичане:
    - Мы можем заменить человека полностью.
    - Браво! Браво!
    Русские:
    - А мы гланды вырезаем...
    - У-у-у-у-
    - ...через жопу...
    - У-у-у-у-
    - ... автогеном
    - Браво! Браво!

    Вот что вы сказали про бизнес:

    У нас есть заказы (orders), которые создаются в лабораториях. У этих заказов есть различные этапы (statuses) их обработки, бизнесу интересно понимать скорость обработки этих заказов, т.е. время перехода из одного статуса в другой, соотношение количества результатов заказов к общему количеству.

    То есть речь не идёт о больших данных. Скорее всего, общее количество заказов за все время деятельности компании меньше 100 миллионов. Вероятно - сильно меньше. ))) И в ближайшие годы количество данных не вырастет в 100 раз.

    А на таком объёме данных такие отчёты можно делать с помощью Aurora (https://aws.amazon.com/ru/rds/aurora/), которая как раз прекрасно подходит для записи отдельных транзакций. Появился новый заказ / изменился статус - сразу записали в базу. То есть можно выкинуть большую часть промежуточных элементов, в результате чего вся система в целом станет намного проще и стабильнее.

    Ну и маленький бонус для бизнеса - обновления данных в отчётах он увидит спустя секунды, а не часы.


    1. Montgomeryg Автор
      03.06.2022 09:30

      Заказы это не единственные данные, которые хранятся у нас. Здесь это приведено лишь для примера. Сущностей в Redshift гораздо больше и, если заказов действительно меньше 100кк, то вот всех связей с ними, которые тоже нужны для аналитики, уже как раз превышают это количество.

      Бизнесу не нужно сиюминутное обновление данных. Именно поэтому мы можем позволить себе задержу в период времени, который составляет один час в текущий момент и вряд ли поднимется выше на текущем решение.


      1. SergeyUstinov
        03.06.2022 09:44

        всех связей с ними, которые тоже нужны для аналитики, уже как раз превышают это количество

        О каких "связях" вы говорите? Справочники (клиент, лаборатория, исполнитель и т.д.)?

        А про время обновления - я и не говорил, что бизнесу сильно нужно обновление за секунды. Это - просто "бесплатное приложение". Но, поверьте, им все равно будет приятно, если данные обновляются быстро. )))


        1. Montgomeryg Автор
          03.06.2022 09:50

          Заказы в нашей системе это сложная структура, которая содержит множество различных связей, кроме банальных: клиент, исполнитель и т.д. Я, к сожалению, не могу вам описать все из-за NDA идеологических соображений. Но, поверьте, их достаточно, чтобы не использовать Mysql, Postgres.

          Но, поверьте, им все равно будет приятно, если данные обновляются быстро. )))

          Не могу вам поверить, потому что бизнес говорит нам другое.

          Анекдот, кстати, классный.


          1. SergeyUstinov
            03.06.2022 11:49

            Заказы в нашей системе это сложная структура, которая содержит множество различных связей, кроме банальных: клиент, исполнитель и т.д. Я, к сожалению, не могу вам описать все из-за NDA идеологических соображений

            Сильно сложнее, чем вот это?
            https://dynamicsdocs.com/nav/2017/w1/table/sales-line

            Это таблица строк заказов в MS NAV / Business Central, с которым я работаю последние годы и, к счастью, структуру которого я могу свободно описать. :)))

            Но, поверьте, их достаточно, чтобы не и спользовать Mysql, Postgres.

            Montgomeryg сама по себе сложность структуры данных не влияет на выбор - колоночная или строковая база данных. Колоночные базы имеют как свои достоинства, так и недостатки. И в вашей статье как раз хорошо описаны недостатки. :)))

            Если же мы говорим о размере данных... Redshift на небольших объемах данных (миллионы - десятки миллионов строк) может обрабатывать запросы медленнее, чем обычная база с построчным хранением на одиночном сервере. MPP (много нод) вносит свои оверхеды.

            А лимиты на просто объем хранимых данных обычно достаточные.

            An Aurora cluster volume can grow to a maximum size of 128 tebibytes (TiB)

            ...

            For Aurora MySQL, the maximum table size is 64 tebibytes (TiB). For an Aurora PostgreSQL DB cluster, the maximum table size is 32 tebibytes (TiB).

            https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_Limits.html