Airflow это популярная опенсорсная платформа управления задачами. В частности его используют для построения ETL-пайплайнов. Например, мне доводилось переливать данные между базами данных, хранилищами и озерами данных с его помощью. А также я использовал его для препроцессинга данных для моделей машинного обучения. Но так ли подходит Airflow для ETL на сегодняшний день?
В этой статье мы рассмотрим как с помощью Airflow ETL операторов выгрузить данные из Postgres в BigQuery в парадигмах ETL и ELT. Далее разберем сложности, с которыми вы можете столкнуться при реализации инкрементальной загрузки данных в DAG (DAG - directed acyclic graph, ориентированный ацикличный граф - цепочка связанных задач). Наконец, мы обсудим почему Airflow ETL операторы не смогут покрыть все ваши потребности в интеграциях в дальней перспективе.
В качестве альтернативы предлагается продолжать использовать Airflow в качестве планировщика и для мониторинга ETL/ELT-пайплайнов, и рассмотреть другие опенсорсные продукты для шагов выгрузки, преобразования и загрузки данных. Например, взять Airbyte для шагов EL (Extract и Load - выгрузки и загрузки данных) и dbt для шагов T (Transform - преобразование данных). Хорошая новость заключается в том, что Airflow отлично интегрируется с Airbyte и dbt.
???? Хороший
Airflow ETL операторы
Представьте, что ваше приложение хранит данные в БД на Postgres. А Маркетинг, Продажники и Продуктовые команды хранят свои данные в сторонних системах вроде Google Ads, Salesforce и Segment. И вы скорее всего захотите централизовать все ваши данные в едином хранилище, например, на Google BigQuery.
В вашей компании уже используется Airflow, поэтому вы начинаете искать ETL операторы для выгрузки, преобразования и загрузки данных среди встроенных операторов и поставляемых пакетов. Когда вы не найдете нужных операторов для взаимодействия с вашими данными, вы возможно воспользуетесь Airflow Hooks для подключения к внешним системам. Например, не существует оператора для получения данных из Segment, но вы можете воспользоваться SegmentHook для обращения к API Segment.
Сначала у вас будет искушение настроить пайплайн, в котором вы выгружаете данные из Postgres в файловое хранилище, преобразуете его локально с помощью PythonOperator и далее загружаете результат в BigQuery. В итоге у вас получается примерно такое решение:
PostgresToGCSOperator (E) -> GCSToLocalFilesystemOperator (E) -> PythonOperator (T) -> LocalFilesystemToGCSOperator (L) -> GCSToBigQueryOperator (L)
Но насколько это хорошая практика на текущий момент?
Это традиционное ETL-решение, в котором логика преобразования выполняется между шагами выгрузки и загрузки данных. В Airflow можно преобразовывать данные локально (PythonOperator, BashOperator, ...), удаленно (SparkSubmitOperator…) или на стороне хранилища (PostgresOperator, BigQueryInsertJobOperator, ...).
В последние годы многие команды переходят из ETL- в ELT-парадигму. В парадигме ELT сначала данные загружаются как есть из источника в хранилище, а потом преобразовываются на стороне хранилища с помощью SQL. Одна из главных проблем ETL-пайплайнов заключается в том, что в них преобразование данных происходит в процессе передачи данных, из-за чего они работают менее стабильно.
С другой стороны современные хранилища данных могут автоматически масштабироваться при росте вычислительных нужд и объемов. А такие современные инструменты как dbt наделили SQL сверхспособностями. С dbt вы можете прикрутить в ваших SQL-скриптах макросы, циклы for и много чего еще. А самое главное, dbt отслеживает зависимости между таблицами, основываясь на их упоминаниях в SQL-коде. Когда же вы пишите преобразования на Airflow, вам нужно дублировать зависимости между таблицами и в SQL и в DAG.
SQL заменяет Python для преобразования и анализа данных в современном дата стеке.
Airflow ELT операторы
Вы можете воспользоваться операторами передачи вместе с операторами БД для ELT-пайплайнов. Но в Airflow нет оператора для прямой передачи данных из Postgres в BigQuery, поэтому требуется промежуточная выгрузка в Google Cloud. Например, вы можете выполнить PostgresToGCSOperator и потом GCSToBigQueryOperator. Далее вы можете выполнить преобразования в приемнике с помощью операторов BigQuery:
PostgresToGCSOperator (E) -> GCSToBigQueryOperator (L) -> BigQueryInsertJobOperator (T)
Airflow предлагает массу вариантов для перегрузки данных из одной системы в другую. Но проблема в том, что чем больше у вас вариантов, тем выше шанс, что вы ошибетесь.
Airflow не является догматичным ETL-инструментом.
???? Плохой
Как мы увидели выше, полная перезагрузка данных с помощью Airflow реализуется достаточно просто. Но что произойдет, когда поменяется схема данных на источнике, и вы перезапустите DAG полной перезагрузки данных? Он упадет. Чтобы он заработал, вам нужно создать дополнительную задачу для удаления данных в приемнике. Это также гарантирует, что ваши DAG идемпотентны, и вы можете перезапускать их безо всяких побочных эффектов при их падении. Далее мы разберем как усложняется ситуация при попытке реализовать инкрементальную загрузку.
Инкрементальная загрузка данных в Airflow
Если вам нужно реализовать инкрементальную загрузку, в первую очередь нужно определиться с частотой запуска DAG. Далее вам нужно убедиться, что каждый DAG загружает только те данные, которые были созданы в период между датой выполнения текущего DAG и следующего. Для этого вам потребуется добавить в SQL-скрипты Airflow макросы `{{ds}}` и `{{next_ds}}` для текущей и следующей даты выполнения соответственно. Значения дат будут подставлены в рантайме.
Другая проблема, с которой вы можете столкнуться, это потребность в отдельнах DAG для полной перезагрузки данных и для инкрементальной загрузки, если у вас высокая частота запуска DAG. Представьте, что вы настроили ежечасное инкрементальное обновление данных, и у вас в источнике хранятся данные глубиной 5 лет. Тогда для полной перезагрузки всей истории потребуется произвести 43800 (5 лет * 365 дней * 24 часа) запусков DAG, что будет работать долго, так как Airflow вносит задержку между запусками DAG. Для решения этой проблемы обычно делают месячные, дневные и часовые DAG, в которых выполняется практически один и тот же код.
Как мы видим, не существует соглашения о том, как настраивать полную перегрузку и инкрементальное обновление для каждого оператора передачи. К тому же у операторов передачи разнятся интерфейсы, режимы синхронизации называются по-разному, и также разнятся подходы к маппингу данных из источника в приемники. В идеале в ETL-инструменте вы бы хотели только выбрать источник, приемник, режим синхронизации и набор необходимых колонок.
В других опенсорсных ETL-инструментах вроде Airbyte присутствует набор общих настроек для всех типов передачи данных. Например, Airbyte поддерживает четыре режима синхронизации: полная перезагрузка с перезаписью, полная перезагрузка с добавлением, инкремент с добавлением и инкремент с дедуплицированной историей.
???? Злой
Источники и приемники в Airflow тесно связаны
Каждый облачный провайдер включает операторы для передачи данных для большинства популярных систем в их облаке. В документации поставляемых пакетов можно найти список операторов и хуков, реализованных вне ядра Airflow. Вы можете посмотреть все 60 операторов передачи в этой табличке, собранной по источникам и приемникам. Ниже приведена хордовая диаграмма со всеми возможными потоками данных, реализуемыми операторами Airflow.
Картинка нарисована с помощью Datasmith Chord Diagram Generator.
Обратите внимание, что диаграмма не отображает направление передачи данных. Только в отдельных случаях вы можете передавать данные как в одну, так и в обратную сторону, как при использовании операторов GCSToS3Operator и S3ToGCSOperator. Также возможна передача данных в рамках одной и той же системы, как, например, в GCSToGCSOperator. Таким образом, для выгрузки данных из Google Ads в Snowflake вам придется использовать GoogleAdsToGCSOperator, потом GCSToS3Operator и наконец S3ToSnowflakeOperator.
Главная проблема операторов Airflow заключается в том, что для поддержки выгрузки данных из M источников в N приемников сообщество должно реализовать N x M операторов. Операторы сфокусированы на ограниченном количестве баз данных, хранилищ и озер. А что делать, если вашей компании понадобится синхронизировать данные в бизнес-приложениях? В Airflow не существует операторов для передачи данных из приложений, кроме Salesforce, Google Ads и Facebook Ads.
В других интеграционных инструментах вроде Airbyte источники и приемники не связаны, и используются коннекторы. Т.о. сообществу нужно реализовать всего лишь 2 * (N + M) коннекторов для поддержки всех возможных процессов передачи данных. Всего лишь за год в Airbyte появилась поддержка сотен коннекторов. И более 1000 типов передачи данных против шестидесяти, поддерживаемых в Airflow.
Операторы Airflow не смогут покрыть все ваши потребности в интеграциях в дальней перспективе.
???? Альтернатива
ELT-пайплайны на Airflow, Airbyte и dbt
Три опенсорсных продукта - Airflow, Airbyte и dbt - созданы для разных целей, но имеют много общего. Изначально Airflow это инструмент управления задачами, Airbyte это инструмент интеграции данных (шаги EL из ETL), а dbt это инструмент для преобразований (шаг T).
Как видно, Airflow можно использовать для ETL- и ELT-пайплайнов. Но на этом возможности не ограничиваются, можно пойти и другим путем. В Airbyte Open-Source и Airbyte Cloud также есть планировщик и интеграция с dbt для шагов преобразования.
Где настраивать расписание ETL- и ELT-пайплайнов?
Если у вас есть множество разнообразных пайплайнов, вы можете использовать Airflow в качестве планировщика для всех типов задач, включая пайплайны ETL/ELT. Далее вы можете интегрировать Airflow с Airbyte для запуска EL-шагов с помощью AirbyteTriggerSyncOperator. Таким образом вы можете запускать инкрементальное обновление из планировщика Airflow, и запускать полную перезагрузку из интерфейса Airbyte без задержки, которая присутствует в джобах перезагрузки в Airflow.
Также есть возможность интегрировать Airbyte с dbt для преобразования данных в каждом коннекторе, который загружает данные в приемник. Имейте в виду, что нужно ограничить использование dbt в Airbyte только простейшими преобразованиями данных в загружаемых таблицах. Если вам надо реализовать сложную логику преобразований, включающую данные из разных dbt-моделей, вам лучше использовать dbt в Airflow.
К сожалению, сообщество еще не договорилось о том как лучше интегрировать Airflow и dbt. Кто-то используется BashOperator, кто-то использует пакет airflow-dbt, разрабатываемый командой GoCardless, а кто-то использует dbt-cloud-plugin. Инженеры из Updater представили свои наработки по генерации графа dbt в Airflow, получаемого путем парсинга из файла dbt manifest.json.
Заключение
Airflow отлично смотрится как оркестратор задач. Так как он широко распространен, многие используют операторы передачи и преобразования данных для планирования и создания ETL и ELT пайплайнов. Мы рассмотрели какие могут возникнуть трудности в Airflow при создании DAG для полной перезагрузки данных и инкрементального обновления. Еще больше проблем с тем, что источники и приемники жестко связаны в операторах передачи. И из-за этого в Airflow невозможно покрыть все потребности в интеграциях.
Как вариант можно продолжать пользоваться Airflow в качестве планировщика и интегрировать его с двумя другими опенсорсными продуктами, которые лучше подходят для ELT пайплайнов - Airbyte для EL-шагов и dbt для T-шагов. В Airbyte источники отделены от приемников, поэтому вы можете выгружать данные из более чем ста источников (базы данных, API, ...) и загружать их в более чем десять приемников (базы данных, хранилища, озера, ...), и исключить использование бойлерплейта, встречающегося в Airflow. А с dbt вы можете преобразовывать данные в SQL-скриптах на стороне вашего хранилища данных и избежать поддержки зависимостей между таблицами в DAG Airflow.
Если вы хотите принять участие, ждем вас в слаке Airbyte, самом активном сообществе про интеграцию данных.
Комментарии (3)
lotnikov
13.10.2021 14:41Инкрементальную загрузку умеет делать только один ETL-инструмент: RAID от компании Mobileum. Все остальные барахтаются в парадигме "последней даты загрузки" и вылезти из этой песочницы никак не могут, результатом является необходимость отдельных пайплайнов для начальной загрузки и перезагрузки данных, что есть прошлый век.
densol92
M * N операторов это конечно звучит страшно, но прелесть AIrflow заключается в простоте реализации нового оператора. Плюс в 90% случаев хочется сохранить данные в промежуточном s3/gcs хранилище и уже оттуда можно лить в нужную базу, что сокращает число операторов до (M+N)*число_промежуточных_хранилищ.