
Всем привет! Меня зовут Алексей Николаев, я работаю дата-инженером в команде ETL-платформы MWS Data (ex DataOps). Часто сталкиваюсь с тем, что в сложной инфраструктуре и больших проектах простые, на первый взгляд, задачи по работе с данными очень сильно усложняются. В результате возникают ситуации, когда хорошие практики превращаются в плохие решения, а плохие практики как раз могут дать хороший результат.
Мои коллеги уже рассказывали про нашу платформу, ее внедрение внутри экосистемы и наши инструменты для работы с данными. В процессе развития продукта перед нами встала проблема массовых регламентных загрузок данных из реляционных источников. Для этого мы создали внутренний инструмент — библиотеку d-van. В качестве движка в ней используется Apache Spark, с которым она взаимодействует через библиотеку onETL. На примере d-van я покажу нестандартный подход к использованию возможностей Apache Spark. Расскажу, какие задачи можно решить с помощью режима master=local и как свой инструмент может стать альтернативой Apache Nifi или Debezium.
За что мы так сильно любим Spark
Apache spark — достаточно универсальный инструмент для обработки данных. Решить с его помощью задачу батчевой загрузки из реляционных источников в hadoop-кластер — легко и просто даже junior-дата-инженеру:
df = spark.read.format("jdbc").options(...).load()
df.write.format("orc").saveAsTable(...)
И дальше orc/parquet/hive-table или кому что милее.
В целом сама задача и ее решение просты и незатейливы как грабли. Надо загрузить одну таблицу — пишешь скрипт в 2 строки. Надо десять — пишем чуть более сложный скрипт, куда передаем названия таблиц списком. Абсолютно никаких проблем, пока код крутится в юпитер-ноутбуках.
Когда дело доходит до продакшна, появляются обычные рутинные проблемы:
ВНЕЗАПНО источников бывает больше одного.
Есть разные типы СУБД.
Данные из разных СУБД без явных кастов к типам не хотят укладываться в таргет-таблицу корректно.
Для разработки и продакшна используются разные окружения.
Некоторые источники состоят из нескольких инстансов, в результате кодовая база уверенно пухнет от копипасты.
Тяжелые таблицы не грузятся одним куском, надо делить их на батчи, иначе мы вообще никогда не дождемся окончания загрузки.
Огромные батчи тоже не хотят грузиться в один поток — их надо параллелить.
Нужно прикрутить инкрементальную выгрузку, так как снапшотами все не перетаскать.
Источники раскиданы по разным часовым поясам, нужно синхронизировать их в момент выгрузки.
ИБ закрыло доступы к источникам с кластера, оставив только выделенные хосты.
Одну половину загрузок нужно перенести на другой кластер, а вторую оставить на старом.
В таргет-таблице лежит половина данных, потому что процесс упал на половине загрузки. После ее перезапуска в таблицах откуда-то появляются дубли.
Нашлись кривые данные: нужно перезагрузить только их, не трогая нормальные.
Кто-то грохнул партицию, и теперь в таблице с сырьем дыра.
...

И это не весь список — много еще чего можно написать. Но уже очевидно, что в продакшне задача решается не так же легко и бодро, как в юпитер-тетрадке у дата-аналитика. Все поставленные проблемы — это примеры задач, которые приходится решать при выстраивании процессов загрузки данных на большинстве проектов. Из-за этого в командах появляется человек, который в основном сидит возле своих загрузок и колдует над сотней скриптов на spark, пытаясь внести в них необходимые изменения, раскатить все на прод и разгрести последствия постоянно возникающих инцидентов со сбоями в загрузках.
Реляционные источники, из которых данные передаются в хранилища батчами, по-прежнему составляют существенную долю от всего объема загружаемых данных. Если список таких источников и таблиц измеряется сотнями и тысячами, то ручная работа тут неприемлема. Продуктовая команда, у которой в работе всего несколько таблиц, наверное, может тратить ресурсы на такие активности. Но для инфраструктурной команды, отвечающей за поставку данных в корпоративное хранилище из десятков, а то и сотен источников, это перебор. Даже если сейчас такая возможность есть, то рано или поздно поддержка этого добра сожрет все доступные ресурсы и разработка новых загрузок в хранилище просто встанет.
Проблемы универсальных решений
Эта история началась еще в те далекие времена, когда версия spark и год рождения дата-инженеров начинались с единицы, в инфраструктурных командах пилить новые загрузки было практически некому и некогда. Понятно, что для массовых загрузок из реляционных БД нужен инструмент, который может:
работать с разными типами БД;
обеспечивать надежность, стабильность и повторяемость загрузок;
восстанавливать потерянные в хранилище данные;
закрывать различные требования безопасности;
поддерживать легкое добавление новых источников, мониторинг и все прочее.
Закрывая эти задачи, продуктовые команды массово создавали собственные кастомные решения для загрузок, а у инфраструктурных команд последовательно рождались и умирали универсальные «комбайны». Причины их преждевременной смерти были одинаковыми:
Решения были монолитными, и их нельзя было отделить от инфраструктуры, а код ядра и сами скрипты загрузки хранились в одном монорепозитории.
Естественно, отсутствовала возможность использования отдельных сред для разработки и отладки.
Релизы, которые планировались «каждый вторник», на деле со скрипом удавалось выпускать раз в несколько месяцев.
Остановка текущих процессов у всех команд, обновление сервиса и последующий перезапуск загрузок были похожи на похороны генерального секретаря ЦК КПСС: много подготовки и суеты, массовая печаль, иногда истерика, а потом обязательно что-то переставало работать.
Конфликт зависимостей между командами приводил к тому, что апгрейдить сборки на новые версии библиотек было невозможно: всегда находился кто-то, чьи загрузки не могли быть переделаны под новые версии зависимостей.
Разговоры про обновление пайплайнов под новый релиз длились дольше, чем он создавался. Сами доработки велись больше времени, чем жили сами решения, а обновлялись они не больше пары раз.
Добавляли счастья конфиги разной степени упоротости. Несколько сотен строк в xml/json/yaml-файлах для настройки одной новой выгрузки вряд ли можно считать понятной и простой историей с низким порогом входа для пользователя.
Движок сервиса или кодогенератора допилить под нестандартный запрос было почти нереально.
Еще можно вспомнить различные «метасторы», системы мониторинга, управления логами, зависимостями и расписанием, которые падали чаще самих загрузок.
Для людей, владевших всей этой магией, ситуация выглядела не так трагично. Остальным пользователям такие решения напоминали велосипед без сиденья, с тремя передними колесами и одной левой педалью. Как-то они работали, но создавали проблем не меньше, чем решали.
Большинство продуктовых команд не могло ими пользоваться и пилило свои собственные инструменты. Инфраструктурная команда страдала и пользовалась сразу всем набором решений одновременно, так как собрать все загрузки было невозможно из-за разной функциональности.
Как мы наступили на грабли универсальности
Нашей лебединой песней было создание на java самописной утилиты загрузки по типу Scoop с запуском через CLI. Решив максимально упростить пользователям конфигурацию загрузок, мы сделали новые конфиги на yaml с возможностью инкапсуляции общих настроек. Было понятно, что львиная доля загрузок отличается друг от друга слабо, иногда только названием таблицы. А пайплайн для одного источника нет смысла конфигурировать для каждой таблицы отдельно, так как он почти не меняется в пределах способа загрузки.
Далее как обычно: сделали собственный мониторинг и набор зависимостей под одну конкретную среду (без учета того, где и что будут использовать разные команды). Бодро прикрутили поддержку Oracle в качестве источника. Дальше уже со скрипом добавили поддержку Postgres, скопировав и доработав ветку Oracle.
Усилия на создание загрузок сократились в разы. Чего не скажешь о трудностях в сопровождении и тестировании этого черного ящика. Инструмент остался заточенным под одно окружение, тестировать загрузки приходилось практически на проде, а команды поддержки не было. Большая часть функций была не стандартной, а имела авторскую реализацию, развивать их было достаточно сложно. По-прежнему это был монорепозиторий, в который коммитили несколько разных продуктовых команд. Нельзя было взять инструмент и добавить его в свое решение. Нужно было часть своего функционала делать на сторонней площадке.
Умерло все на задаче переноса загрузок с одного кластера на другой. Монорепозиторий нельзя было раздеплоить на два разных hadoop-кластера, пришлось создать отдельные форки для каждого кластера. Потребовалось пересобрать утилиту с другими версиями зависимостей, так как в старых имелись баги, которые сильно мешали жить. Но пересобирать было уже некому — автор решения потерял интерес к своему творению и уволился.

Создание нового велосипеда
После такого, пусть и не самого удачного, проекта мы обдумали совершенные ошибки и поняли, как действовать дальше:
Собрать основные боли пользователей и сконцентрироваться на них, а не на создании «интересных технических решений».
Взять готовый и предсказуемый движок в качестве «грузилки», а не изобретать и поддерживать свой.
Сделать максимально простое управление функциональностью, чтобы пользователи не страдали при написании развесистых yaml-конфигов.
Добавить возможность сделать загрузки частью любого продуктового решения. Пользователи должны самостоятельно добавлять загрузки в любой свой проект.
Реализовать запуск загрузок из кода, а не интерфейсов или консолей.
Сделать возможным запуск в любом месте на любом окружении без зависимостей от среды. Чтобы код, запускаемый в процессе разработки и отладки, не отличался от того, что потом будет крутиться на проде.
Предотвратить неуправляемые кодогенерации.
Избежать ограничений по зависимостям от библиотек, используемых командой.
Реализовать возможность интеграции в используемые сервисы в компании: не должно быть никаких «внешних» поделок — мониторинга, хранения метаданных и управления регламентом — которые необходимо везде таскать за собой прицепом.
И еще важно было сделать так, чтобы решение легко и просто развивалось и сопровождалось, при этом никак не ломая уже работающие загрузки. А сложность его развертывания должна быть не выше, чем pip install.
Вроде понятно, что делать дальше: нужно было выбрать движок. Может быть... spark? У него отличный python-api — поверх можно накрутить любую функциональность. Под капотом есть все, что нужно для мониторинга и управления ресурсами.
Использование библиотеки onETL сильно облегчило разработку нового инструмента благодаря высокоуровневой реализации взаимодействия с источниками — почитать документацию можно здесь, посмотреть доклады тут и тут. Это позволило минимизировать количество создаваемых функций, не пилить все с нуля и максимально сконцентрироваться на самом решении.
Процесс разработки и деплоя создаваемых spark-приложений выстраивали на основе EverProject — внутреннего инструмента команды МТС BigData, являющегося шаблоном ETL-проектов на python. EverProject содержит базовые конфигурации ETL-приложения, преднастроенный ci/cd-пайплайн, включающий сборку docker-образов и деплой создаваемых приложений на нашу инфраструктуру hadoop-airflow-spark. Так как это де-факто стандарт для наших продуктовых команд, то проблем с порогом вхождения у инженеров не возникает.
Так мы собрали low-code-библиотеку с названием d-van.
Что умеет d-van
Посмотрим, что в итоге у нас получилось:
мы реализовали пайплайн для загрузки батчей снапшотами или инкрементально и укладыванием данных по партициям на кластере;
он безопасно грузит батчи в файлы на HDFS с заданными уровнем параллелизма и форматом;
по окончании загрузок разом переносит все сохраненные файлы в hive-партиции, чтобы пользователи в моменте не получили доступ к неконсистентным данным;
если в процессе загрузки что-то упало, перезапуск продолжит работу пайплайна с момента падения;
если в таргет-таблице потерялись или были удалены какие то данные, то отсутствующие диапазоны данных автоматом будут добавлены в загрузку;
здесь же решаются вопросы управления партициями в таргет-таблицах, синхронизации данных из разных часовых поясов, преобразования типов источника к таргету, оптимального разделения загрузки на батчи и все прочее, о чем я рассказывал в первом разделе.
Примеры
Пример конфига (при условии, что region, sources и target настроены на уровне проекта):
defaults:
- regions: ../regions
- target: ../hive
- _self_
load_strategy: snapshot
source_table: foo
source_columns: '*'
table_name: mycompany__{dbcode}__{schema}_${.source_table}
sources:
- dbcode: msk_somedb
schema: spam
connection: ${connections.${.dbcode}}
- dbcode: nvsb_somedb
schema: spam
connection: ${connections.${.dbcode}}
Пример части кода для вызова загрузки:
import hydra
from pyspark.sql import SparkSession
from d_van.dvan_master import DVanMaster
task = "snapshot_foo.yaml"
with hydra.initialize(version_base="1.3", config_path="/relative/path/to/config/folder"):
app_conf = hydra.compose(config_name="config", overrides=[f"task={task}"])
spark = SparkSession.builder.appName("my_app").getOrCreate()
dvan = DVanMaster(spark, app_conf.task)
dvan.execute_parallel()
spark.stop()
Возможности d-van
На тип источника внутри пайплайна не завязывались, так как, используя под капотом onEtl, можно единообразно общаться с реляционными БД любого типа. Для учета особенностей механики конкретных СУБД создали в d-van классы-адаптеры, в которых нет лишней логики и минимум кода (поэтому поддержка 4 реляционных источников Oracle, Postgres, MSSQL и MySQL была готова уже в первых релизах и при необходимости можно добавить любые из поддерживаемых onETL). Сделали конкурентный запуск джобов внутри поднятой spark-сессии, чтобы не поднимать их десятками при загрузке одной таблицы с разных инстансов источника и/или одновременной загрузке нескольких батчей параллельно.
Пайплайн загрузки при работе берет настройки из конфига всего ETL-проекта. EverProject использует иерархические конфиги OmegaConf фреймворка hydra. Благодаря этому большая часть необходимых параметров для работы пайплайна по умолчанию есть во всех ETL-проектах: параметры для dev-test- и prod-сред и все нужное для подключения к используемым командой источникам.
Докидываем параметры для задач загрузки, инкапсулируем и переиспользуем все, что уже есть в конфиге проекта, — этого достаточно для запуска. EverProject решает вопросы зависимостей, изолированности кодовых баз и привязки к инфраструктуре. Можно безопасно использовать любую версию spark и других библиотек на любом окружении.
onETL позволяет запускать загрузки в любом своем окружении и разрабатывать, тестировать и отлаживать их безопасно. Если ты видел, что код работает в локальном окружении, то его можно смело деплоить куда угодно. Логи и мониторинг у нас от Spark, метрики и статистика тоже из его комплекта (Graphite и Grafana). Для отслеживания происхождения данных можно легко прикрутить openLineage к Spark — и нам будут видны все загрузки.
Пишем кучу тестов, собираем и публикуем в артифактори свою библиотеку — и у нас есть готовый загрузчик, который можно добавить как зависимость в любой из своих ETL-проектов и использовать его, чтобы переносить данные на кластер. Работать все будет и без EverProject: импортировать библиотеку можно в любом py-скрипте, а конфиг в загрузчик прокинуть обычным python dict. Просто dict-конфигами не так удобно управлять, как с hydra. И деплой в продакшн придется использовать свой.
Запуск spark-приложения в режиме master=local
Это одна из возможностей, которую нам дает Spark. Ее удобно использовать для разработки и тестирования, но у приличных людей тянуть ее в прод не принято. Однако в случае с одновременной загрузкой нескольких батчей ситуация иная. В режиме yarn на каждую таску нам нужен один worker (пусть и с минимальным объемом памяти, но минимум с 1 ядром). Worker не утилизирует на хосте целое ядро CPU, но из пула ресурс-менеджера каждый работающий worker выдергивает одно ядро. Если разом грузим 1 000 батчей, то 1 000 ядер пропадет из пула yarn-очереди. А это будет чувствительно для кластера.
Этого можно избежать в режиме local: spark даже в нем остается многопоточным движком и позволяет запустить одновременно загрузку нескольких батчей, которые работают параллельно и потребляют от силы 1 ядро CPU на всех. В режиме local нам не требуется запуск worker — мы сразу экономим кучу времени, особенно когда речь идет о небольших батчах, которые грузятся около одной минуты. А еще мы в этом режиме не используем при загрузках wide-трансформации. Таким образом, нам не нужны RAM и распределенные вычисления: RAM для работы загрузок нужна только Spark-драйверу и для fetchsize. Еще режим master=local решает приколы с информационной безопасностью, когда все загрузки должны идти только через разрешенные хосты.
Что мы получили с d-van
Это библиотека, которая одной из первых решает задачи загрузки не только для инфраструктурных, но и для продуктовых команд. Постепенно она заменяет самописные решения. Благодаря использованию более ранних наработок и EverProject нам удалось собрать d-van быстро и без необходимости писать много кода. И feature-реквесты от пользователей говорят о том, что библиотека востребована и будет развиваться. И получают они свои фичи максимально быстро. А могут и сами запилить что-то под свои нужды — код библиотеки в открытом доступе внутри компании.
C докладом про d-van мы подавались на одну конференцию и получили фидбэк за «продвижение худших практик использования Spark». Но факт остается фактом: их использование успешно закрыло нам болезненную тему батчевых загрузок. Python и pуspark в команде знают все инженеры, аналитики и DS, а прикрутить NiFi или Debezium к каждому проекту — задача не из легких.
OnETL давно доступен в open source, любой желающий может использовать эту библиотеку для самостоятельного создания собственных решений. Разработанный нами d-van — универсальный инструмент для батчевой загрузки из реляционных СУБД — сейчас доступен только внутри нашей компании. Он содержит в себе некоторые решения, завязанные на нашей внутренней инфраструктуре, но их несложно выпилить и сделать ее более универсальной. Поделитесь своим мнением, будет ли интересна такая тулза в open source?