Всем привет! Меня зовут Александр Токарев, я технический архитектор домена «Управление данными» в Леруа Мерлен. Год назад мы уже делали обзор нашей Платформы данных, сейчас же я расскажу про её развитие за последний год и про задачи, которые нам удалось решить.

Мы столкнулись с необходимостью масштабировать наш подход, когда количество источников, интегрированных в платформу, стало больше 150. Всего же мы планируем интегрировать данные из более чем 800 систем. Однако ETL‑инструменты, которые мы использовали на первых этапах развития дата платформы, не позволяли добиться эффективного масштабирования. Кроме того, сам процесс интеграции источников был достаточно трудоемким. Поэтому возник запрос на рефакторинг архитектуры процесса поставки данных, который, с одной стороны, позволил бы эффективно горизонтально масштабироваться, а с другой стороны, упростил бы сам процесс интеграции. В результате мы пришли к следующей схеме процесса.

Вкратце напомню структуру изначального процесса, который мы выстроили в ходе построения дата‑платформы. Источники данных отправляют CDC (Change Data Capture) сообщения в Kafka‑топики, которые вычитываются ETL (Extract‑Transform‑Load) процессом на NiFi, складывающим преобразованные CDC в БД GreenPlum в сырой слой (Raw). Затем по расписанию запускается загрузчик в Airflow, вызывающий хранимую процедуру в самом GreenPlum, которая делает свёртку сырых сообщений и преобразует их в табличный вид, который у нас называется ODS (Operational Data Store) слоем. Далее на основе ODS‑слоя можно построить слой витрин данных (Marts), но это не всегда является обязательным.

Как было
Как было

Кроме того, в нашей компании при работе с данными мы основываемся на подходе Data Mesh. Его суть заключается в том, что за интеграцию данных в дата‑платформу и обеспечение качества интегрируемых данных отвечают команды, являющиеся владельцами систем‑источников данных. В свою очередь, команда дата‑платформы отвечает за предоставление инструментов для проведения интеграции и за обеспечение работоспособности этих инструментов в процессе эксплуатации.

Данный подход был вполне эффективен на первых этапах развития дата‑платформы. Однако со временем в результате роста количества подключенных систем и объёма загружаемых данных мы столкнулись с рядом проблем.

1. Технологическая нагрузка по расчету ODS‑слоя из сырых данных выполняется на тех же мощностях, что и запросы пользователей. Из‑за этого мы регулярно получали жалобы на то, что запросы выполняются медленно. В качестве частичного решения этой проблемы мы поставили выполнение всех технологических процессов на ночь, но, во‑первых, это уже затрагивало наших потребителей в дальних часовых поясах, во‑вторых, ограничивало скорость поступления данных в ODS‑слой до одного раза в сутки.

2. В продуктовых командах, владеющих системами, которые должны быть интегрированы в дата‑платформу, не всегда есть дата‑инженеры с нужными компетенциями. Для небольших команд это не всегда экономически оправдано. Кроме того, дата‑инженер должен владеть сопутствующими технологиями, такими как NiFi и Airflow.

3. Около 75% источников используют однотипные системы для хранения данных — например, БД PostgreSQL или MSSQL. Соответственно процессы интеграции данных из таких источников выглядят однотипно. Каждый дата‑инженер просто копировал интеграционный ETL‑процесс в NiFi с похожего источника и слегка его модифицировал. Поэтому возникла потребность в создании универсального процесса под стандартные типы источников данных, которые просто конфигурировались бы под каждую интеграцию.

4. Необходимо было обеспечить резервирование всех данных на случай отказа основного кластера GreenPlum и их быстрое восстановление.

Концепция решения

Таким образом, мы приняли решение вынести всю технологическую нагрузку за пределы кластера GreenPlum. Все поступающие из Kafka‑топиков CDC‑сообщения сохраняются в персистентное хранилище на базе S3, с разделением по источникам, целевым таблицам и временным интервалам. Мы решили сделать единый процесс, читающий данные сразу из всех источников и обрабатывающий CDC‑сообщения в зависимости от их формата. Это позволило нам автоматически балансировать ресурсы между источниками в зависимости от нагрузки — вместо создания отдельных ETL‑процессов на NiFi под каждый источник. Также это дало возможность сделать универсальные конфигурируемые обработчики для часто встречающихся типов источников данных.

После сохранения CDC‑сообщений из Kafka на S3 мы запускаем процесс формирования ODS‑слоя, который также хранится на S3. Он представляет собой снимок таблицы (snapshot) на определенный момент времени и может служить для неё точкой восстановления. Таким образом, хранилище на S3 является первоисточником для данных ODS‑слоя, которые затем могут загружаться как в общий коммунальный кластер GreenPlum, где с ним могут работать все желающие, так и в другие специализированные кластеры и БД, которые работают с меньшим объёмом исходных данных для повышения производительности.

Ниже приведена логическая схема загрузки данных в дата‑платформу.

Логическая схема загрузки данных в дата-платформу
Логическая схема загрузки данных в дата-платформу

На данной схеме я показываю, что все CDC‑сообщения, приходящие из Kafka, сохраняются в неизменном виде на S3. При сохранении они разделяются по директориям в соответствии с названием таблицы и интервалом времени. Для определения интервала времени используется event‑time, содержащийся в самом сообщении. Это нужно для того, чтобы в случае так называемых late events мы могли доложить их в нужную директорию и пересчитать.

Далее на основе сохранённых CDC‑сообщений в Raw‑слое необходимо рассчитать приращение для основной таблицы — дельту. Дельта представляет собой последовательность операций, которые необходимо применить к предыдущему снимку таблицы для того, чтобы получить снимок таблицы на начало следующего периода. Таким образом, мы получаем серию снимков для таблицы с определённой периодичностью.

У данного подхода есть два недостатка. Первый заключается в том, что в случае большого объёма данных в снимке и малого — в дельте при расчёте следующего снимка будет расходоваться много ресурсов. Поэтому имеет смысл перестраивать снимки не после появления каждой новой дельты, а после появления нескольких дельт. Например, вместо S1 = S0 + d0 и S2 = S1 + d1 можно рассчитывать снимок через два интервала как S2 = S0 + d0 + d1 и т. д.

Второй недостаток заключается в том, что каждый раз загружать в GreenPlum полное состояние таблицы при появлении нового снимка также будет слишком затратным процессом. Поэтому мы разработали процедуру применения дельт к таблице непосредственно к данным, уже хранящимся в GreenPlum, что устраняет необходимость в постоянной перезагрузке данных. Таким образом, загрузка данных в GreenPlum из снимков таблицы необходима только в случае восстановления резервной копии, в штатном же режиме дельты применяются и к снимкам таблицы на S3, и к данным в самом GreenPlum. В случае штатной работы системы состояние таблицы после применения соответствующей дельты и состояние снимка на S3 должны быть эквивалентными.

Реализация

С учётом всех изложенных выше вводных мы пришли к следующей архитектуре системы.

Текущая архитектура
Текущая архитектура

Изображенные на схеме компоненты процесса интеграции источников в дата‑платформу взаимодействуют следующим образом. Точкой входа в контур дата‑платформы и механизмом передачи CDC‑сообщений от источников в дата‑платформу является Kafka. Как и в предыдущей реализации дата‑платформы, мы исходим из того, что данные от источников поступают в виде непрерывного потока CDC‑сообщений. Из неё данные вычитываются модулем CDC ingest, построенным на Apache Flink. Его основными задачами являются загрузка данных из Kafka‑топиков и сохранение их на S3 в партиционированном виде.

Кроме того, модуль CDC ingest определяет, завершилось ли заполнение директории или ещё в процессе. Для этого он использует оконные функции, входящие в состав Apache Flink. Для каждого интервала времени он собирает множество всех выходных директорий, в которые была запись за этот интервал. Затем при помощи скользящего окна мы сравниваем множества за последний и предыдущий интервалы, и если в предыдущем интервале есть пути, которых нет в последнем, то мы считаем, что запись в эти директории завершилась и их можно передавать на следующие шаги.

Для расчета приращений (дельт) ODS‑слоя служит модуль Raw to ODS, построенный на Apache Spark. Его задача заключается в свёртке CDC‑сообщений по натуральному ключу таким образом, чтобы в результате его работы по каждому ключу была только одна итоговая операция, которую нужно применить к предыдущему состоянию таблицы. Например, если по данному ключу была сначала операция delete, а затем insert, то он их преобразует в одну операцию update. Или наоборот, если по ключу было три операции, сначала insert, потом update, потом delete, то он просто исключит этот ключ из итоговой дельты, и т. д. Также этот модуль отвечает за построение снимков таблиц на определённые моменты времени.

За загрузку данных ODS‑слоя из S3 в GreenPlum отвечает модуль Upload coordinator. Сами данные на S3 подключаются как внешние таблицы по протоколу PXF и затем перегружаются во внутренние таблицы GreenPlum при помощи хранимой процедуры. Изначально планировалось делать его на Airflow, который просто бы вызывал хранимую процедуру, однако мы столкнулись со следующей проблемой. Нам было необходимо, чтобы от запуска к запуску у нас формировался разный DAG, структура которого зависела от состава и количества партиций, готовых к загрузке. Однако динамические DAGи на Airflow позволяют сформировать только одну форму DAGа, которая не меняется от запуска к запуску. Поэтому в качестве координатора загрузки был выбран Apache Spark, который может распараллелить задачи на загрузку. В отличие от предыдущего модуля, здесь Spark не занимается непосредственно обработкой больших объёмов данных, а просто вызывает хранимую процедуру с Executor`ов, поэтому здесь мы ему выделяем существенно меньше ресурсов, чем предыдущему модулю.

За координацию задач по расчёту и загрузке данных между тремя предыдущими модулями отвечает сервис Task Coordinator. После окончания записи сырых данных модуль CDC ingest делает в нём запись о том, что определённую директорию можно обрабатывать следующим модулем. В свою очередь, модуль Raw to ODS считывает из координатора список путей к директориям, готовым к обработке, и в результате создаёт следующий список уже из директорий ODS‑слоя, готовых к загрузке в GreenPlum. Аналогично работает и модуль загрузки данных ODS.

За управление параметрами конфигурации всех этапов процесса отвечает сервис Metadata. Ранее мы уже делали про него отдельную статью. Он позволяет, в частности, определить схему данных для каждой таблицы источника, указать, какие Kafka‑топики использовать для чтения CDC‑сообщений, настроить периодичность появления новых директорий на S3 и т. д.

Одним из преимуществ данного решения является то, что в целом процесс получился идемпотентным. На любом этапе в случае возникновения какой‑либо ошибки мы просто выполняем его заново, и это не приводит к дублированию или потере данных. Кроме того, идемпотентность процесса позволяет нам корректно работать в случае дублирования CDC‑событий или в случае запоздалых событий (late events). Из‑за того что мы используем для разделения на директории время самого события, они всегда попадут в одну директорию raw‑слоя и будут дедуплицированы и рассчитаны заново.

После запуска в эксплуатацию мы обнаружили следующий недостаток. Так как для обработки данных со всех источников использовался единый процесс, то в случае появления ошибок в структуре данных одного из источников он весь останавливался целиком. Кроме того, если по одному источнику начинают прогружать большой объём данных, то он влиял на скорость загрузки данных по другим источникам. В итоге было принято решение о разделении потоков по нескольким независимым экземплярам, или пулам. На каждый пул выделяется определенный объём ресурсов, прежде всего ядер процессора и памяти. В итоге мы сейчас работаем в 4 пулах: два из них выделены отдельно для систем с большим объёмом данных, один пул — для нестабильных источников (как правило, это такие источники, которые были недавно подключены) и ещё один — для всех остальных источников со стабильной структурой сообщений и небольшими объёмами данных.

Нагрузка

Вся технологическая нагрузка была вынесена на внешний кластер k8s. Как следствие, была существенно повышена производительность кластера GreenPlum для пользовательских запросов. В частности, для загрузки и расчёта самого большого источника раньше в среднем расходовалось порядка 15% вычислительных мощностей, а после переключения — около 1%.

График нагрузки на GreenPlum при использовании старого процесса
График нагрузки на GreenPlum при использовании старого процесса
График нагрузки на GreenPlum после переключения
График нагрузки на GreenPlum после переключения

Выводы

В итоге внедрение новой реализации позволило нам достичь следующих результатов.

1. Упрощён процесс интеграции источников в дата‑платформу. Для стандартных источников больше нет необходимости в создании отдельного ETL‑процесса. Достаточно просто добавить соответствующую конфигурацию в сервис метаданных. Время интеграции стандартного источника уменьшилось в ~2 раза.

2. Благодаря снижению нагрузки на кластер GreenPlum в ~10 раз, освободившиеся ресурсы были выделены для полезной работы пользовательских процедур и запросов.

3. Рассчитанные данные ODS‑слоя теперь доступны как в GreenPlum, так и на S3. Это, во‑первых, обеспечивает резервирование данных, а во‑вторых, позволяет работать с ними, используя различные инструменты для распределённой обработки данных, такие как Apache Spark, минуя GreenPlum.

Комментарии (17)


  1. OneManStudio
    00.00.0000 00:00
    +9

    Круто, может еще через 5 лет сайт научится запоминать выбраный регион, брать его из кукисов, по геометки или хотя бы подтягивать из профиля авторизированного профиля. А то авторизацию запоминает, а регион стабильно мск выбирает.

    Когда уже для клиентов что нибудь сделают?) Сайт как был унылой затычкой 10 лет назад, так и остался таким до сих пор.

    Хорошо хоть магазин остался нормальным.


  1. Geckelberryfinn
    00.00.0000 00:00
    +1

    В целом решение весьма логично, но, возможно несколько перегружено технологическими слоями. Flink, Nifi, Spark... Причём много усилий для перерасчёта дельт. На следующем этапе я бы исследовал возможность исключения Nifi из цепочки и рассмотрел бы возможность и целесообразность перехода к Databricks с его стримингом и Delta tables. Возможно упростит обслуживание дельт без ущерба масштабируемости решения.


    1. alextokarev Автор
      00.00.0000 00:00
      +2

      NiFi мы уже на текущем этапе исключили, заменив его Flink-ом


    1. EvgenyVilkov
      00.00.0000 00:00

      Леройчик в яндекс клауде, Весь креатив надо самому впиливать. Не все на это готовы


    1. KlimenkoIv
      00.00.0000 00:00

      Коллега, а почему DataBricks?
      Все, что представлено, является бесплатным решением.К тому же, все представленные сервисы можно замостить у себя, без ухода в облака. С точки зрения безопасников, это просто Must Be.


  1. seasadm
    00.00.0000 00:00

    Интересно как вы 800 источников насчитали...


    1. alextokarev Автор
      00.00.0000 00:00

      Ведём свой дата каталог


  1. EvgenyVilkov
    00.00.0000 00:00
    +3

    В верном направлении движетесь.

    Через Х лет ждем материал "мы наконец то решили отказаться от GreenPlum".


  1. HDDimon
    00.00.0000 00:00
    +1

    Вы сторону delta lake не смотрели? Соглашусь с коллегой, решение кажется технологически перегруженным. Я правильно понимаю что flink вы внедрили ради оконных функций для применения дельт?


    1. alextokarev Автор
      00.00.0000 00:00

      В целом да, сейчас flink используется больше как труба для перекачки данных из кафки в s3, и из фичей флинка пока что используются только оконные функции. Но в будущем мы планируем развивать этот модуль для выполнения более сложных аналитических расчетов на потоках данных.


  1. OldPronStar
    00.00.0000 00:00

    пользуясь случаем, хочу спросить:

    почему софт на служебных леруашных смартфонах так болезненно реагирует на разлочку загрузчика и рутирование, что даже magiskhide не спасает?


    1. neura
      00.00.0000 00:00

      Это часть нашего security pipeline


  1. andylar
    00.00.0000 00:00
    +1

    Хорошо бы еще ценность научиться понимать всех этих 800 источников)


  1. web3_Venture
    00.00.0000 00:00
    -1

    Теперь я знаю как выглядит зоопарк технологий. Причем зоопарк почти одних и теже технологий которые в большинстве случаев замещают друг друга.

    Как будто ктото пытался впихнуть всё что слышал знает...


    1. EvgenyVilkov
      00.00.0000 00:00

      Вообще ни разе не правы. Очень простая архитектура и почти правильная. Все то лишнее не этой картинке - это GreenPlum. Но до этого они пока не созрели.


      1. KlimenkoIv
        00.00.0000 00:00
        +1

        Коллега, а какую альтернативу СУБД для аналитического хранилища вы рассматриваете?


        1. EvgenyVilkov
          00.00.0000 00:00
          +1

          А что такое СУБД? :) Зачем она? Есть хранение в S3, к которому легко приклеивается например iceberg. К слою хранения нужен метастор (который к слову скоро появится у яндекса как SaaS). Дальше нужен движок (spark, impala, trino - выбирай любой) который в зависимости от потребности поднимается в managed k8s. Работает быстрее GP а еще и дешевле в плане билинга в облаке.