Привет, меня зовут Мельников Владислав, я эксперт в команде аналитической платформы данных проекта «Управления цепочками поставок» в Magnit tech.
Предыстория
В сентябре 2022 года в Магните стартовал проект по переводу аналитики с он-прем железа в облако. Преимущества этого были очевидны – в компании достаточно активно шла цифровая трансформация, что требовало увеличения вычислительных легко-масштабируемых мощностей. Плюс возник острый вопрос об импортозамещении. Фактически компании пришлось выбирать как пути миграции, так и возможные инструменты работы. Было принято решение о миграции в облако Яндекс.
На момент старта миграции мы имели в качестве источников Teradata, MS-SQL, Oracle, Hadoop (исторические данные).
Выбранный стэк
На момент перехода компанией был выбран следующий стэк решения нашей задачи:
Airflow – оркестратор
S3 для хранения для начальных и рассчитанных данных
Spark как движок для расчетов
Clickhouse – витрины данных для отчетов
Apache Superset – для отчетности
По всем этим инструментам очень много статей на английском, и чуть меньше – на русском. Цель нашей статьи – описать свое решение и те грабли, на которые мы наступили в процессе миграции.
Концепция
Общая концепция представляла собой Pipepline: Teradata -> S3 -> (трансформация c помощью Spark) -> Clickhouse.
В целом концепция не изменилась, лишь слегка модернизировалась. Например, в качестве источников добавились еще два типа БД – MS SQL и Oracle. Пришлось вести сервисные записи для инкрементно-загружаемых таблиц в PostgreSQL вместо S3, для работы в режиме Multi-Cluster заводить YDB (об этом ниже) и т.д.
Рассмотрим реализацию по этапам.
Выгрузка и преобразования
Выгрузка из Терадаты
Первоначально задача нам казалась простой – сделать SELECT к БД и сохранить выгруженное в parquet, однако тут нас ждали трудности. Первое – объемы передаваемых данных. Первоначальные попытки использовать драйверы JDBC оказались неудачными, так как необходимо было передать в облако дневные объемы одной таблицы порядка 220 млн. строк * 97 колонок (что в архивированном виде CSV представляет около 10 Гб). Кроме того, плоские запросы к ПРОД Терадата оптимизатор перемещал в очереди в самый конец. Следующая итерация выгрузки заключалась в использовании нативной для Teradata утилиты tbuild реализующей механизм TPT. Опытным путем были найдены оптимальные настройки выгрузки – 10 потоков по 200Мб. Файлы заливались напрямую в S3 в сжатом виде (gzip). В результате время выгрузки сократилось до 2–4 часов. Все эти выгрузки оркестровались с помощью Airfow.
Поскольку просто выгрузка одного дня таблицы в течение 2х часов (не говоря уж про 4 часа) нас не устроила, мы воспользовались механизмом динамического маппинга задача в Airflow. Мы разбили выгрузку на N потоков. На данный момент N=10, поскольку выгружаем мы несколько таблиц и единовременно выгружаемые объемы ограничены настройками учетной записи. Деление на потоки определяется как остаток от деления на N некоего более-менее равномерно распределенного ID. На текущий момент выгрузка занимает порядка 15 минут, в зависимости от нагруженности сервера-источника, конечно.
Выгрузка из MS SQL и Oracle
Как уже говорилось выше, оказалось, что для выгрузки нам понадобилось получать данные из MS SQL и Oracle. К счастью, выгружаемые из этих БД таблицы имели меньшие объемы.
Проблема оказалась в том, что мы не могли воспользоваться, например, утилитой BCP MS SQL и нам пришлось использовать для выгрузки JDBC. В Spark JDBC может быть запущен в режиме параллелизма. Однако протестировав ее, мы отказались от такой реализации. В чем проблема? Для обеспечения параллельной выгрузки JDBC необходимо определить колонку по которой будет происходить партицирование, нижнюю границу значения, верхнюю границу значения и количество партиций. Такое определение не позволяет равномерно распараллелить потоки. Кроме того, если посмотреть план запроса, то мы увидим, что фактически JDBC формирует параллельное выполнение SQL запросов с автоматическим определений ограничений WHERE.
То же самое можно организовать в PySpark c помощью python библиотек futures или multiprocessing, однако этот механизм более универсален. В интернете много статей описывающих как организовать подобный механизм, поэтому описывать тут не буду.
Преобразования в S3
Итак, следующим, вторым, этапом было преобразование *.gz файлов (Landing) в необходимые для дальнейшей работы в файлы deltaLake (RAW слой). Для этого мы используем сервис DataProc со Spark. На момент начала перехода мы использовали постоянно запущенный кластер DataProc. К чему мы пришли позднее – я опишу ниже.
Все преобразованные таблицы у нас хранятся в S3. Это сделано из-за того, что, во-первых, в Clickhouse расположены наши конечные витрины и кластер постоянно нагружен пользовательскими запросами BI и без дополнительных расчетов. Во-вторых, Clickhouse это колоночная OLAP БД, которая на момент начала работ не очень хорошо работала с JOIN и достигала наибольшей скорости при использовании денормализованных таблиц. Таким образом, нужды хранить нормализованные данные в Clickhouse не было. Еще один момент – расчеты. Для некоторых расчетов используется по несколько таблиц с глубиной обработки до 3х лет. т.е. это около 240 млрд строк. Было принято решение, что для таких объемов будет логично использовать Spark.
Третий этап – различные расчеты и денормализация данных для дальнейшего предоставления один к одному в витрины Clickhouse (DDS слой).
Не думаю, что что-то необходимо добавлять, разделение на такие этапы идет в практически любом DWH и ничего нового мы тут не придумали.
На что наткнулись при реализации этих этапов.
Оказалось, что данные у нас в источниках изменяются задним числом. Причем бывает, что на достаточно большую глубину. Каждый день выгружать повторно такие объёмы очень затратно по времени. Мы не можем себе этого позволить в большинстве случаях, хотя в части случаев так и делаем. Но вот для некоторых таблиц пришлось формировать в источниках дополнительные таблицы, куда попадали только изменения предыдущих дней. И затем эти изменения накатывать в Spark. Теоретически для этого можно использовать стандартные механизмы deltaLake, однако по причинам, описанным ниже, от них отказались.
Для случая выгрузки из MS SQL и Oracle мы отказались от промежуточного сохранения в landing –поскольку сразу могли писать в RAW-слой: на стороне источника мы не имели возможности поднять CDC и не имели PUSH-инкремента. Фактически мы забираем целиком один раз в сутки дневные данные за прошлые дни.
Оркестрирование
Основное предназначение Airflow – оркестрировать потоки данных. На данный момент существует мнение, что если дать необходимые ресурсы и превратить Airflow в большой кластер, то можно даже на нем использовать различные библиотеки Python для расчетов, такие, например, как Pandas. Мы отказались от такого подхода, для нас Airflow это что-то вроде удобной оболочки запуска cron. Все вычисления ведутся с помощью специализированных средств, для которых они и предназначены.
По реализации. Первоначально мы объединяли все этапы Pipeline в один DAG разными способами. Однако оказалось, что по тем или иным причинам, даги падали с завидным постоянством. По каким причинам – менялось от ситуации и на 99% это зависело от доступности источника. Теоретически повторный запуск DAG можно сделать и с помощью стандартных средств Airflow, но как оказалось, такой вариант работоспособен при ежедневной работе и постоянном контроле за выгрузкой. При выгрузке же исторических данных из множества таблиц нестабильность выгрузки приводила к отвлечению от разработки дата-инженеров, и без того небольшой команды. Соответственно было принято решение настраивать асинхронную работу загрузки.
Т.е. при загрузке инкрементных данных (факты) каждый из этапов Airflow запускается отдельно, независимо. Таким образом, если рассмотреть цепочку Выгрузка -> Преобразование -> Загрузка, то если Выгрузка отрабатывала, а во время преобразований по каким-то причинам происходил сбой, Выгрузка начинала подтягивать новую порцию данных, а Преобразование могло успеть подхватить выгруженную первую партию, а могло подхватить сразу две выгруженных партии. Так минимизировалось время обработки и достигалась стабильность. Кроме того, в каждом DAG’е настроили мониторинг (с помощью отдельного таска), который, в случае падения отсылал письмо на необходимых адресатов. Сам факт отработки DAG’а фиксировался в *.json файлах в S3. Однако, с какого-то момента, нас перестало это удовлетворять, и на данный момент мы перенесли это в Postgresql.
Запуск загрузки справочников, в силу их малости и быстроты скорости обработки, организован в один DAG, который запускает подряд несколько дагов, соответствующих нужному этапу. На рисунке ниже добавлен отдельный шаг – check_dag_state. Мы пришли к его необходимости позднее. Clickhouse, как оказалось, достаточно капризная БД. При ее обслуживании необходимо отключать все имеющиеся DAG’и по заливке в БД. И если DAG по заливке в Clickhouse отключен, то весь первоначальный DAG виснет и ждет исполнения отключенного subDag’а. Такое поведение приводит к некорректным трансформациям в слое DDS. Текущая реализация позволяет отключать даг по заливке в Clickhouse без каких-либо последствий для остальных расчетов в Spark.
Для реализации существенных преобразований с наименьшими временными и денежными затратами мы решили использовать не постоянные кластера, а динамически создаваемые, благо это позволяет облако. Кроме того, по затрачиваемым ресурсам разница не существенная, а вот по времени это позволяет нам распараллеливать задачу. Таким образом, с помощью заданий Airflow Spark-кластер в необходимой нам конфигурации мы производим расчет и удаляем этот кластер. В некоторых случаях обрабатываем до 6 партиций одной и той же таблицы единовременно. Нюансы при подобной обработке, конечно, и тут есть. Это я опишу ниже.
Типичный пример дага представляет собой вот такой граф. На скриншоте видно, что параллельно запущено 2 кластера.
На что мы наткнулись. Пришлось дорабатывать стандартные операторы работы c Яндекс DataProc, чтобы можно было передавать значения в некоторые поля из других тасков. Еще один момент – удаление кластеров. В процессе отладки у нас бывали случаи, когда кластера создавались, даги падали, а кластера не удалялись. И мы вдруг могли наткнуться на 70 поднятых, не работающих кластеров. Соответственно, необходимо было безусловно удалять кластера с одной стороны, и необходимо было иметь логи упавших скриптов, с другой (а логи при удалении кластера также удаляются). Для этого мы сделали сохранение логов в S3 в случае падения задания. Кроме того, оказалось, что бывают случаи, когда процесс создания кластера останавливается из-за нехватки квот, при этом кластер считается поднятым, а оператор дага выдает ошибку. Для такого случая была сделана дополнительная обработка поиска ИД кластера DataProc по его наименованию с помощью API облака ( на рисунке выше это шаг get_dataproc_cluster_id ). И последнее на что мы наткнулись при работе Airflow (на текущий момент мы используем версию 2.7.3) - к сожалению, обработка trigger_rules для групп заданий при маппинге групп работает с особенностями, поэтому пришлось очень тщательно настраивать эти значения.
Работа с deltaLake
Исторически так сложилось, что таблицы в S3 у нас ведутся в формате deltaLake (open-source). В чем плюсы в нашем случае? Во-первых, при записи используется статистика не только, хранящаяся в parquet, но и описании deltaLog (по умолчанию статистика собирается по первым 30 полям). Во-вторых, есть процедуры автоматически объединяющие мелкие файлы для более быстрого чтения. Кроме того, сейчас среди образов DataProc в статусе preview в Яндекс-облаке появился образ с Spark 3.5.0, который совместно с deltaLake позволяет использовать новую технологию партицирования Liquid Clustering. После различных тестов мы пришли к тому, что на части таблиц обработка при таком партицировании сократилась до 10 раз. Для некоторых же таблиц никакого положительного эффекта найдено не было. В ближайшее время мы будем переходить на Liquid Clustering для некоторых данных.
Особенностью deltaLake является хранение логов изменений данных в json-файлах, а для ускорения обработки этих данных периодически состояние json объединяются в файлы *.parquet – т.н. checkpoint. И эта особенность накладывает определенные ограничения на одновременную обработку разными кластерами одной и той же таблицы, которая является важной частью нашей стратегии распараллеливания вычислений. В deltaLake существует решение этой проблемы в виде мульти-кластерной обработки. В оригинальном open-source deltaLake предлагается вести ее с помощью DynamoDB. Яндекс Облако предоставляет возможность это делать с помощью DynamoDB-совместимой serverless БД – YDB. Ввод мульти-кластера резко сократило как наши повторное использование ресурсов для расчетов, так и время расчетов – все кластеры стали отрабатывать параллельно без ошибок в процессе записи логов.
В процессе работы мы встретились с неожиданной особенностью. Теоретически deltaLake предоставляет ACID возможности. Механизм предоставляет возможности обновления записей посредством использования MERGE и UPDATE. Однако в процессе тестирования оказалось, что скорость работы этих механизмов для open-source deltaLake в нашем случае оставляет желать лучшего. Для обновления данных вместо этих механизмов мы используем оконную функцию - просто берем последнюю по времени запись.
На данный момент в Яндекс облаке появился образ DataProc c Spark 3.5.0. Эта версия позволяет использовать deltaLake 3.2.0 с новым видом партицирования Liquid Cluster. Теоретически использование Liquid Cluster должно давать прирост в производительности. Наши тесты показали следующее – часть скриптов разогналась с 25-30 минут до 3-7 минут с одной стороны. С другой стороны, для некоторых скриптов никакого прироста не было. т.е. партицирование Liquid Cluster как минимум не хуже Hive-партицирования. Однако мы не пошли по пути полного перехода на Liquid Cluster. Просто потому, что посчитанные в Spark данные необходимо еще и залить в Clickhouse. Алгоритм отбора данных с использованием только одной партиции был протестирован на большом объеме данных (об этом ниже). Считывание же из deltaLake c помощью соответствующей функции, пока нами не протестировано на необходимых объёмах. Кроме того, необходимо помнить, что все данные, лежащие в Liquid Cluster, необходимо постоянно оптимизировать для более быстрого чтения, что тоже требует дополнительного времени и ресурсов. На данный момент мы находимся в процессе перехода на Spark 3.5.0 и предполагаем использовать Liquid Cluster на RAW слое. Возможно, в дальнейшем переведем на Liquid Cluster и DDS слой. Что касается оптимизации файлов для Hive-партицирования, у нас настроены ежедневные DAG’и, которые выполняют удаление старых файлов (VACUUM), и еженедельные DAG’и, которые оптимизируют актуальное количество файлов (OPTIMIZE).
Загрузка в Clickhouse
Самая тяжелая часть, как ни странно, оказалась именно загрузка в Clickhouse. Там очень много подводных камней. Например, объемы дисков. В облаке было 2 типа дисков – локальные и сетевые не реплицируемые (NRD). Локальные диски – зеркало, поэтому для отказоустойчивости необходимо всего 2 хоста на один шард. Относительно быстро оказалось, что диски имеют тенденцию очень быстро наполняться, а объем локальных дисков не может быть больше 3 Тб. Это приводило нас к тому, что необходимо больше шардов либо менять диски. NRD диски позволяли иметь объёмы до 8Тб, что в наших условиях было неплохо. Однако тут появились нюансы. Оказалось, что для отказоустойчивости необходимо как минимум 3 хоста на шард и, что самое неприятное, NRD-диски достаточно часто выходят из строя. Теоретически имея 3 хоста на один шард это не критично, но как оказалось, при выходе из строя одного из дисков шарда, при INSERT’е Clickhouse выдает соответствующее сообщение, а драйвер попросту считает подобный инцидент за ошибку. Подобные ситуации нами не были предусмотрены, и в срочном порядке пришлось переделывать все DAG’и Airflow под игнорирование данной ошибки.
Вставка большого объёма данных в шардированную таблицу. Не секрет, что заливку в распределенную таблицу в Clickhouse лучше всего осуществлять не через Distributed таблицу, а напрямую в шард. Чтобы это осуществить, мы выбираем более-менее равномерно-распределенную сущность в таблице (в нашем случае это оказалось ID товара) и просто берем остаток от деления на количество шардов. Соответственно этому остатку и происходит загрузка на соответствующий шард. Шардированные таблицы агрегатов с движком ReplicatedAggregatingMergeTree мы заливаем с использованием Distributed таблицы.
Проекции
Проекции оказались очень удобным способом ускорить запросы. При этом, как оказалось, в ряде случаев у нас проекции не включались или включались если в условие WHERE включить дополнительное условие. Например, в таблице есть всего один год данных (пусть это будет 2023). Если добавить в значение WHERE YEAR_ID = 2023, то проекции срабатывали, без такого условия не срабатывали. Почему – причины мы не нашли. Однако, нашли обходной путь – просто уменьшили (а у нас все проекции это группировки) количество атрибутов агрегирования в проекции. И сделали несколько проекций. После чего все проекции начали использоваться более устойчиво.
Что касается самой загрузки из deltaLake в Clickhouse
В Clickhouse была функции обращения к таблицам deltaLake. До последнего времени она некорректно работала с партицированными таблицами. В связи с этим для загрузки данных была использована другая функция – S3Cluster, которая позволяет обратиться к файлам напрямую. Чтобы иметь актуальный список текущих файлов в deltaLake (а формат deltaLake хранит историю изменений), на необходимые таблицы deltaLake было назначено свойство автоматически формировать после изменения манифесты. В deltaLake это просто текстовый файл с перечислением актуальных *.parquet файлов. Далее, при загрузке был написан SQL запрос, который открывал данный манифест, читал список файлов и уже эти файлы читались с помощью S3Cluster.
В общих чертах это основной список проблем, с которым мы столкнулись при переносе данных в облако. Конечно, были еще инфраструктурные проблемы, которые так или иначе решаются профессиональной командой компании Яндекс. Например, c обновлением версий или в настройках и использования Superset в качестве BI.
Целью данной статьи было просто поделиться способами решения возникших вопросов, касающихся области Data Engineering при переходе в облако. Мне показалось, что это может сэкономить время и нервы тем, кто сейчас движется сходными путями.
EvgenyVilkov
А как победили физическое удаление данных в терадата ? Сделали его журналироаание на источнике? Или просто периодическая перезагрузка на глубину?
Lazycat_su Автор
У нас есть два алгоритма работы в данном случае - простой, когда мы просто перегружаем данные за день (в том случае, когда нет необходимости перетаскивать данные на большую глубину, и сложный - когда когда приходится вытаскивать изменения). в сложном случае, мы просто не удаляем, а обнуляем значение.
EvgenyVilkov
Я просто знаком немного с вашей спецификой и знаю что данные в терадате удаляются физически и за год и за два и за три назад. Удаления никак не журналируются в терадате и единственный способ выравнивания данных на сейча - это перезагрузка всех партиций где когда либо происходили изменения.
Если у вас есть подсистема качества данных, которая делает сверку между терадатой и озером, то вы наверняка столкнулись с этой проблемой. Поэтому и спрашиваю
Lazycat_su Автор
Мы перенесли данные которые есть и которые нам необходимы для построения аналитики (это далеко не все данные). Далее накапливаем то что, считается. Кроме того, на данный момент у нас самый большой горизонт - 3 года. Часть данных опускается в Hadoop из Терадаты, а не удаляются. Так или иначе, с проблемой восстановления удаленных данных мы до сих пор не сталкивались.
EvgenyVilkov
А сверку делали? Есть какой то механизм сверки регуляной?
Потому что если бы сделали сверку то оказалось бы что данные за прошедшие годы и месяцы успешно выгруженные данные из Терадаты в озеро не совпадают с тем что сейчас в Терадате потому что часть строк удалили в Терадате.
Lazycat_su Автор
Мы сталкиваемся с пересчетом партиций задним числом - поскольку данные пересчитывались в Терадате. Мы так же их перезаливаем в облако. Именно использование параллельно создаваемых кластеров DataProc позволяет нам относительно быстро перегрузить эти данные.
EvgenyVilkov
Те сверки перодической нет получается?
Если в терадате удалили три строки за (условно) 01.12.2017, но при этом за этот период не делали никаких прерасчетов, а вот просто взяли и удалили, то вы об этом знаете и расходитесь с источником данных? Так?
Lazycat_su Автор
У нас нет цели создать КХД - этим занимается другая команда. Мы не пересчитываем бухгалтерию и себестоимость по FIFO. Мы строим аналитические отчеты на основании имеющихся данных. Сейчас у нас максимальная глубина 2022 год (а не 2017, хотя я представляю где и когда возникают такие расхождения и я сам не понимаю, зачем вносить/удалять данные задним числом, скажем даже полугодовой давности). На самом деле на данный момент 3х лет для сравнения данных у нас хватает. Уровень возможных расхождений согласован с методологами и бизнес-пользователями.
Сверка есть, но не на уровне 5-летней давности ;-)
EvgenyVilkov
Те сверка до глубины 2022 года показывает что все ок? Или вы перегружаете каждый раз от 2022-текущий день?