Привет всем, немного информации "из под капота" дата инженерного цеха Альфастрахования — что будоражит наши технические умы.
Apache Spark — замечательный инструмент, позволяющий просто и очень быстро обрабатывать большие объемы данных на достаточно скромных вычислительных ресурсах (я имею в виду кластерную обработку).
Традиционно, в процессе ad hoc обработки данных используется jupyter notebook. В комбинации со Spark-ом это позволяет нам манипулировать долго живущими дата фреймами (распределением ресурсов занимается Spark, дата фреймы "живут" где-то в кластере, время их жизни ограничено временем жизни Spark контекста).
После переноса обработки данных в Apache Airflow время жизни дата фреймов сильно сокращается — Spark контекст "живет" в пределах одного оператора Airflow. Как это обойти, зачем обходить и при чем здесь Livy — читайте под катом.
Давайте рассмотрим совсем-совсем простой пример: предположим нам нужно денормализовать данные в большой таблице и сохранить результат в другой таблице для дальнейшей обработки (типичный элемент конвейера обработки данных).
Как бы мы это делали:
- загрузили данные в dataframe (выборка из большой таблицы и справочников)
- посмотрели "глазами" на результат (правильно ли получилось)
- сохранили dataframe в таблицу Hive (например)
По результатам анализа нам может потребоваться вставить на втором шаге какую-то специфическую обработку (словарную замену или еще что-то). С точки зрения логики мы имеем три шага
- шаг 1: загрузка
- шаг 2: обработка
- шаг 3: сохранение
В jupyter notebook у нас так и получается — мы можем сколь угодно долго обрабатывать загруженные данные, отдав Spark-у управление ресурсами.
Вполне логично ожидать, что такое разбиение удастся перенести в Airflow. То есть иметь граф примерно такого вида
К сожалению, это невозможно при использовании комбинации Airflow + Spark: каждый оператор Airflow исполняется в своем python интерпретаторе, поэтому кроме всего прочего каждый оператор должен как-то "персистить" результаты своей деятельности. Тем самым наша обработка "сжимается" в один шаг — "денормализовать данные".
Как можно "вернуть" в Airflow гибкость jupyter notebook? Понятно, что приведенный пример "того не стоит" (может быть, даже наоборот — получается хороший понятный шаг обработки). Но все же — как сделать так, чтобы операторы Airflow могли выполняться в одном Spark контексте над общим пространством dataframe-ов?
Приветствуем Livy
На помощь приходит еще один продукт экосистемы Hadoop — Apache Livy.
Не буду пытаться здесь описать — что это за "зверь" такой. Если совсем кратко и черно-бело — Livy позволяет "инжектить" python код в программу, которую исполняет driver:
- сначала мы создаем сессию работы с Livy
- после этого у нас есть возможность исполнять произвольный код на python-е в этой сессии (очень похоже на идеологию jupyter/ipython)
И к всему этому есть REST API.
Возвращаясь к нашей простенькой задаче: с помощью Livy мы можем сохранить изначальную логику нашей денормализации
- на первом шаге (первом операторе нашего графа) мы загрузим и выполним код загрузки данных в dataframe
- на втором шаге (втором операторе) — выполним код необходимой дополнительной обработки этого dataframe
- на третьем шаге — код сохранения dataframe-а в таблицу
Что в терминах Airflow может выглядеть так:
(поскольку картинка вполне реальный скриншот, то добавились дополнительные "реалии" — создание Spark контекста стало отдельной операцией со странным названием, "обработка" данных пропала, потому что не нужна, и т.п.)
Если обобщить — мы получаем
- универсальный Airflow оператор, который выполняет код на python-е в сессии Livy
- возможность "организовывать" код на python-е в достаточно сложные графы (на то и Airflow)
- возможность заняться оптимизациями более высокого уровня, например, в каком порядке нужно выполнять наши преобразования с тем, чтобы Spark смог максимально долго держать общие данные в памяти кластера
Типичный конвейер подготовки данных для моделирования содержит порядка 25 запросов по 10 таблицам, очевидно, что некоторые таблицы используются чаще других (те самые "общие данные") и есть что пооптимизировать.
Что дальше
Техническая возможность опробована, думаем дальше — как технологичнее перевести наши трансформации в эту парадигму. И как подступиться к упомянутой выше оптимизации. Мы еще в начале этой части нашего пути — когда будет что-то интересное, обязательно поделимся.
stanislavnikitin
Из статьи остался непонятным следующий момент.
До Ливи проблемой было прокинуть спарк-контекст между операторами Эйрфлоу. Теперь, получается, у нас есть некая Ливи-«сессия», внутри которой у нас живет информация в т.ч. о спарк-контексте. Однако непонятно, как теперь эта инфа прокидывается от оператора к оператору. Или сессия живет на датанодах сама по себе, и в коде каждого оператора к ней нужно подключаться?
Korolevmv Автор
В-целом Вы верно поняли, чуть уточню: Ливи "инжектит" python код в driver, он там и исполняется (со всеми вытекающими). Очень похоже на jupyter — с помощью Ливи мы "добрасываем" операторы в конец нашего кода.
На "драйвере" живет сессия, сами данные (dataframe-ы) живут в worker-ах, их жизнью управляет Spark и они доступны в Spark-программе по именам (как обычно).