Всем привет! Меня зовут Алексей Николаев, я работаю дата-инженером в команде ETL-платформы MWS Data (ex DataOps). Часто сталкиваюсь с тем, что в сложной инфраструктуре и больших проектах простые, на первый взгляд, задачи по работе с данными очень сильно усложняются. В результате возникают ситуации, когда хорошие практики превращаются в плохие решения, а плохие практики как раз могут дать хороший результат.

Мои коллеги уже рассказывали про нашу платформу, ее внедрение внутри экосистемы и наши инструменты для работы с данными. В процессе развития продукта перед нами встала проблема массовых регламентных загрузок данных из реляционных источников. Для этого мы создали внутренний инструмент — библиотеку d-van. В качестве движка в ней используется Apache Spark, с которым она взаимодействует через библиотеку onETL. На примере d-van я покажу нестандартный подход к использованию возможностей Apache Spark. Расскажу, какие задачи можно решить с помощью режима master=local и как свой инструмент может стать альтернативой Apache Nifi или Debezium.

За что мы так сильно любим Spark

Apache spark — достаточно универсальный инструмент для обработки данных. Решить с его помощью задачу батчевой загрузки из реляционных источников в hadoop-кластер — легко и просто даже junior-дата-инженеру:

df = spark.read.format("jdbc").options(...).load()
df.write.format("orc").saveAsTable(...)

И дальше orc/parquet/hive-table или кому что милее.

В целом сама задача и ее решение просты и незатейливы как грабли. Надо загрузить одну таблицу — пишешь скрипт в 2 строки. Надо десять — пишем чуть более сложный скрипт, куда передаем названия таблиц списком. Абсолютно никаких проблем, пока код крутится в юпитер-ноутбуках.

Когда дело доходит до продакшна, появляются обычные рутинные проблемы:

  • ВНЕЗАПНО источников бывает больше одного. 

  • Есть разные типы СУБД.

  • Данные из разных СУБД без явных кастов к типам не хотят укладываться в таргет-таблицу корректно.

  • Для разработки и продакшна используются разные окружения.

  • Некоторые источники состоят из нескольких инстансов, в результате кодовая база уверенно пухнет от копипасты.

  • Тяжелые таблицы не грузятся одним куском, надо делить их на батчи, иначе мы вообще никогда не дождемся окончания загрузки.

  • Огромные батчи тоже не хотят грузиться в один поток — их надо параллелить.

  • Нужно прикрутить инкрементальную выгрузку, так как снапшотами все не перетаскать.

  • Источники раскиданы по разным часовым поясам, нужно синхронизировать их в момент выгрузки.

  • ИБ закрыло доступы к источникам с кластера, оставив только выделенные хосты.

  • Одну половину загрузок нужно перенести на другой кластер, а вторую оставить на старом.

  • В таргет-таблице лежит половина данных, потому что процесс упал на половине загрузки. После ее перезапуска в таблицах откуда-то появляются дубли.

  • Нашлись кривые данные: нужно перезагрузить только их, не трогая нормальные.

  • Кто-то грохнул партицию, и теперь в таблице с сырьем дыра.

  • ...

И это не весь список — много еще чего можно написать. Но уже очевидно, что в продакшне задача решается не так же легко и бодро, как в юпитер-тетрадке у дата-аналитика. Все поставленные проблемы — это примеры задач, которые приходится решать при выстраивании процессов загрузки данных на большинстве проектов. Из-за этого в командах появляется человек, который в основном сидит возле своих загрузок и колдует над сотней скриптов на spark, пытаясь внести в них необходимые изменения, раскатить все на прод и разгрести последствия постоянно возникающих инцидентов со сбоями в загрузках.

Реляционные источники, из которых данные передаются в хранилища батчами, по-прежнему составляют существенную долю от всего объема загружаемых данных. Если список таких источников и таблиц измеряется сотнями и тысячами, то ручная работа тут неприемлема. Продуктовая команда, у которой в работе всего несколько таблиц, наверное, может тратить ресурсы на такие активности. Но для инфраструктурной команды, отвечающей за поставку данных в корпоративное хранилище из десятков, а то и сотен источников, это перебор. Даже если сейчас такая возможность есть, то рано или поздно поддержка этого добра сожрет все доступные ресурсы и разработка новых загрузок в хранилище просто встанет.

Проблемы универсальных решений

Эта история началась еще в те далекие времена, когда версия spark и год рождения дата-инженеров начинались с единицы, в инфраструктурных командах пилить новые загрузки было практически некому и некогда. Понятно, что для массовых загрузок из реляционных БД нужен инструмент, который может:

  • работать с разными типами БД;

  • обеспечивать надежность, стабильность и повторяемость загрузок; 

  • восстанавливать потерянные в хранилище данные;

  • закрывать различные требования безопасности;

  • поддерживать легкое добавление новых источников, мониторинг и все прочее.

Закрывая эти задачи, продуктовые команды массово создавали собственные кастомные решения для загрузок, а у инфраструктурных команд последовательно рождались и умирали универсальные «комбайны». Причины их преждевременной смерти были одинаковыми:

  • Решения были монолитными, и их нельзя было отделить от инфраструктуры, а код ядра и сами скрипты загрузки хранились в одном монорепозитории. 

  • Естественно, отсутствовала возможность использования отдельных сред для разработки и отладки. 

  • Релизы, которые планировались «каждый вторник», на деле со скрипом удавалось выпускать раз в несколько месяцев. 

  • Остановка текущих процессов у всех команд, обновление сервиса и последующий перезапуск загрузок были похожи на похороны генерального секретаря ЦК КПСС: много подготовки и суеты, массовая печаль, иногда истерика, а потом обязательно что-то переставало работать. 

  • Конфликт зависимостей между командами приводил к тому, что апгрейдить сборки на новые версии библиотек было невозможно: всегда находился кто-то, чьи загрузки не могли быть переделаны под новые версии зависимостей. 

  • Разговоры про обновление пайплайнов под новый релиз длились дольше, чем он создавался. Сами доработки велись больше времени, чем жили сами решения, а обновлялись они не больше пары раз. 

  • Добавляли счастья конфиги разной степени упоротости. Несколько сотен строк в xml/json/yaml-файлах для настройки одной новой выгрузки вряд ли можно считать понятной и простой историей с низким порогом входа для пользователя. 

  • Движок сервиса или кодогенератора допилить под нестандартный запрос было почти нереально.

  • Еще можно вспомнить различные «метасторы», системы мониторинга, управления логами, зависимостями и расписанием, которые падали чаще самих загрузок.

Для людей, владевших всей этой магией, ситуация выглядела не так трагично. Остальным пользователям такие решения напоминали велосипед без сиденья, с тремя передними колесами и одной левой педалью. Как-то они работали, но создавали проблем не меньше, чем решали. 

Большинство продуктовых команд не могло ими пользоваться и пилило свои собственные инструменты. Инфраструктурная команда страдала и пользовалась сразу всем набором решений одновременно, так как собрать все загрузки было невозможно из-за разной функциональности.

Как мы наступили на грабли универсальности

Нашей лебединой песней было создание на java самописной утилиты загрузки по типу Scoop с запуском через CLI. Решив максимально упростить пользователям конфигурацию загрузок, мы сделали новые конфиги на yaml с возможностью инкапсуляции общих настроек. Было понятно, что львиная доля загрузок отличается друг от друга слабо, иногда только названием таблицы. А пайплайн для одного источника нет смысла конфигурировать для каждой таблицы отдельно, так как он почти не меняется в пределах способа загрузки.

Далее как обычно: сделали собственный мониторинг и набор зависимостей под одну конкретную среду (без учета того, где и что будут использовать разные команды). Бодро прикрутили поддержку Oracle в качестве источника. Дальше уже со скрипом добавили поддержку Postgres, скопировав и доработав ветку Oracle.

Усилия на создание загрузок сократились в разы. Чего не скажешь о трудностях в сопровождении и тестировании этого черного ящика. Инструмент остался заточенным под одно окружение, тестировать загрузки приходилось практически на проде, а команды поддержки не было. Большая часть функций была не стандартной, а имела авторскую реализацию, развивать их было достаточно сложно. По-прежнему это был монорепозиторий, в который коммитили несколько разных продуктовых команд. Нельзя было взять инструмент и добавить его в свое решение. Нужно было часть своего функционала делать на сторонней площадке.

Умерло все на задаче переноса загрузок с одного кластера на другой. Монорепозиторий нельзя было раздеплоить на два разных hadoop-кластера, пришлось создать отдельные форки для каждого кластера. Потребовалось пересобрать утилиту с другими версиями зависимостей, так как в старых имелись баги, которые сильно мешали жить. Но пересобирать было уже некому — автор решения потерял интерес к своему творению и уволился.

Создание нового велосипеда

После такого, пусть и не самого удачного, проекта мы обдумали совершенные ошибки и поняли, как действовать дальше:

  • Собрать основные боли пользователей и сконцентрироваться на них, а не на создании «интересных технических решений».

  • Взять готовый и предсказуемый движок в качестве «грузилки», а не изобретать и поддерживать свой.

  • Сделать максимально простое управление функциональностью, чтобы пользователи не страдали при написании развесистых yaml-конфигов. 

  • Добавить возможность сделать загрузки частью любого продуктового решения. Пользователи должны самостоятельно добавлять загрузки в любой свой проект. 

  • Реализовать запуск загрузок из кода, а не интерфейсов или консолей. 

  • Сделать возможным запуск в любом месте на любом окружении без зависимостей от среды. Чтобы код, запускаемый в процессе разработки и отладки, не отличался от того, что потом будет крутиться на проде. 

  • Предотвратить неуправляемые кодогенерации. 

  • Избежать ограничений по зависимостям от библиотек, используемых командой. 

  • Реализовать возможность интеграции в используемые сервисы в компании: не должно быть никаких «внешних» поделок — мониторинга, хранения метаданных и управления регламентом — которые необходимо везде таскать за собой прицепом.

  • И еще важно было сделать так, чтобы решение легко и просто развивалось и сопровождалось, при этом никак не ломая уже работающие загрузки. А сложность его развертывания должна быть не выше, чем pip install.

Вроде понятно, что делать дальше: нужно было выбрать движок. Может быть... spark? У него отличный python-api — поверх можно накрутить любую функциональность. Под капотом есть все, что нужно для мониторинга и управления ресурсами.

Использование библиотеки onETL сильно облегчило разработку нового инструмента благодаря высокоуровневой реализации взаимодействия с источниками — почитать документацию можно здесь, посмотреть доклады тут и тут. Это позволило минимизировать количество создаваемых функций, не пилить все с нуля и максимально сконцентрироваться на самом решении.

Процесс разработки и деплоя создаваемых spark-приложений выстраивали на основе EverProject — внутреннего инструмента команды МТС BigData, являющегося шаблоном ETL-проектов на python. EverProject содержит базовые конфигурации ETL-приложения, преднастроенный ci/cd-пайплайн, включающий сборку docker-образов и деплой создаваемых приложений на нашу инфраструктуру hadoop-airflow-spark. Так как это де-факто стандарт для наших продуктовых команд, то проблем с порогом вхождения у инженеров не возникает.

Так мы собрали low-code-библиотеку с названием d-van.

Что умеет d-van

Посмотрим, что в итоге у нас получилось:

  • мы реализовали пайплайн для загрузки батчей снапшотами или инкрементально и укладыванием данных по партициям на кластере; 

  • он безопасно грузит батчи в файлы на HDFS с заданными уровнем параллелизма и форматом;

  • по окончании загрузок разом переносит все сохраненные файлы в hive-партиции, чтобы пользователи в моменте не получили доступ к неконсистентным данным; 

  • если в процессе загрузки что-то упало, перезапуск продолжит работу пайплайна с момента падения;

  • если в таргет-таблице потерялись или были удалены какие то данные, то отсутствующие диапазоны данных автоматом будут добавлены в загрузку;

  • здесь же решаются вопросы управления партициями в таргет-таблицах, синхронизации данных из разных часовых поясов, преобразования типов источника к таргету, оптимального разделения загрузки на батчи и все прочее, о чем я рассказывал в первом разделе.  

Примеры

Пример конфига (при условии, что region, sources и target настроены на уровне проекта):

    defaults:
        - regions: ../regions
        - target: ../hive
        - _self_
        
    load_strategy: snapshot
    source_table: foo
    source_columns: '*'
    table_name: mycompany__{dbcode}__{schema}_${.source_table}
    
    sources:
        - dbcode: msk_somedb
        schema: spam
        connection: ${connections.${.dbcode}} 
        - dbcode: nvsb_somedb
        schema: spam
        connection: ${connections.${.dbcode}}

Пример части кода для вызова загрузки:

import hydra
from pyspark.sql import SparkSession
from d_van.dvan_master import DVanMaster
 
task = "snapshot_foo.yaml"
with hydra.initialize(version_base="1.3", config_path="/relative/path/to/config/folder"):
    app_conf = hydra.compose(config_name="config", overrides=[f"task={task}"])

spark = SparkSession.builder.appName("my_app").getOrCreate()

dvan = DVanMaster(spark, app_conf.task)
dvan.execute_parallel()
spark.stop()

Возможности d-van

На тип источника внутри пайплайна не завязывались, так как, используя под капотом onEtl, можно единообразно общаться с реляционными БД любого типа. Для учета особенностей механики конкретных СУБД создали в d-van классы-адаптеры, в которых нет лишней логики и минимум кода (поэтому поддержка 4 реляционных источников Oracle, Postgres, MSSQL и MySQL была готова уже в первых релизах и при необходимости можно добавить любые из поддерживаемых onETL). Сделали конкурентный запуск джобов внутри поднятой spark-сессии, чтобы не поднимать их десятками при загрузке одной таблицы с разных инстансов источника и/или одновременной загрузке нескольких батчей параллельно.

Пайплайн загрузки при работе берет настройки из конфига всего ETL-проекта. EverProject использует иерархические конфиги OmegaConf фреймворка hydra. Благодаря этому большая часть необходимых параметров для работы пайплайна по умолчанию есть во всех ETL-проектах: параметры для dev-test- и prod-сред и все нужное для подключения к используемым командой источникам.

Докидываем параметры для задач загрузки, инкапсулируем и переиспользуем все, что уже есть в конфиге проекта, — этого достаточно для запуска. EverProject решает вопросы зависимостей, изолированности кодовых баз и привязки к инфраструктуре. Можно безопасно использовать любую версию spark и других библиотек на любом окружении.

onETL позволяет запускать загрузки в любом своем окружении и разрабатывать, тестировать и отлаживать их безопасно. Если ты видел, что код работает в локальном окружении, то его можно смело деплоить куда угодно. Логи и мониторинг у нас от Spark, метрики и статистика тоже из его комплекта (Graphite и Grafana). Для отслеживания происхождения данных можно легко прикрутить openLineage к Spark — и нам будут видны все загрузки.

Пишем кучу тестов, собираем и публикуем в артифактори свою библиотеку — и у нас есть готовый загрузчик, который можно добавить как зависимость в любой из своих ETL-проектов и использовать его, чтобы переносить данные на кластер. Работать все будет и без EverProject: импортировать библиотеку можно в любом py-скрипте, а конфиг в загрузчик прокинуть обычным python dict. Просто dict-конфигами не так удобно управлять, как с hydra. И деплой в продакшн придется использовать свой.

Запуск spark-приложения в режиме master=local

Это одна из возможностей, которую нам дает Spark. Ее удобно использовать для разработки и тестирования, но у приличных людей тянуть ее в прод не принято. Однако в случае с одновременной загрузкой нескольких батчей ситуация иная. В режиме yarn на каждую таску нам нужен один worker (пусть и с минимальным объемом памяти, но минимум с 1 ядром). Worker не утилизирует на хосте целое ядро CPU, но из пула ресурс-менеджера каждый работающий worker выдергивает одно ядро. Если разом грузим 1 000 батчей, то 1 000 ядер пропадет из пула yarn-очереди. А это будет чувствительно для кластера.

Этого можно избежать в режиме local: spark даже в нем остается многопоточным движком и позволяет запустить одновременно загрузку нескольких батчей, которые работают параллельно и потребляют от силы 1 ядро CPU на всех. В режиме local нам не требуется запуск worker — мы сразу экономим кучу времени, особенно когда речь идет о небольших батчах, которые грузятся около одной минуты. А еще мы в этом режиме не используем при загрузках wide-трансформации. Таким образом, нам не нужны RAM и распределенные вычисления: RAM для работы загрузок нужна только Spark-драйверу и для fetchsize. Еще режим master=local решает приколы с информационной безопасностью, когда все загрузки должны идти только через разрешенные хосты.

Что мы получили с d-van

Это библиотека, которая одной из первых решает задачи загрузки не только для инфраструктурных, но и для продуктовых команд. Постепенно она заменяет самописные решения. Благодаря использованию более ранних наработок и EverProject нам удалось собрать d-van быстро и без необходимости писать много кода. И feature-реквесты от пользователей говорят о том, что библиотека востребована и будет развиваться. И получают они свои фичи максимально быстро. А могут и сами запилить что-то под свои нужды — код библиотеки в открытом доступе внутри компании.

C докладом про d-van мы подавались на одну конференцию и получили фидбэк за «продвижение худших практик использования Spark». Но факт остается фактом: их использование успешно закрыло нам болезненную тему батчевых загрузок. Python и pуspark в команде знают все инженеры, аналитики и DS, а прикрутить NiFi или Debezium к каждому проекту — задача не из легких.

OnETL давно доступен в open source, любой желающий может использовать эту библиотеку для самостоятельного создания собственных решений. Разработанный нами d-van — универсальный инструмент для батчевой загрузки из реляционных СУБД — сейчас доступен только внутри нашей компании. Он содержит в себе некоторые решения, завязанные на нашей внутренней инфраструктуре, но их несложно выпилить и сделать ее более универсальной. Поделитесь своим мнением, будет ли интересна такая тулза в open source?

Комментарии (0)