Airflow это популярная опенсорсная платформа управления задачами. В частности его используют для построения ETL-пайплайнов. Например, мне доводилось переливать данные между базами данных, хранилищами и озерами данных с его помощью. А также я использовал его для препроцессинга данных для моделей машинного обучения. Но так ли подходит Airflow для ETL на сегодняшний день?

В этой статье мы рассмотрим как с помощью Airflow ETL операторов выгрузить данные из Postgres в BigQuery в парадигмах ETL и ELT. Далее разберем сложности, с которыми вы можете столкнуться при реализации инкрементальной загрузки данных в DAG (DAG - directed acyclic graph, ориентированный ацикличный граф - цепочка связанных задач). Наконец, мы обсудим почему Airflow ETL операторы не смогут покрыть все ваши потребности в интеграциях в дальней перспективе.

В качестве альтернативы предлагается продолжать использовать Airflow в качестве планировщика и для мониторинга ETL/ELT-пайплайнов, и рассмотреть другие опенсорсные продукты для шагов выгрузки, преобразования и загрузки данных. Например, взять Airbyte для шагов EL (Extract и Load - выгрузки и загрузки данных) и dbt для шагов T (Transform - преобразование данных). Хорошая новость заключается в том, что Airflow отлично интегрируется с Airbyte и dbt.

???? Хороший

Airflow ETL операторы

Представьте, что ваше приложение хранит данные в БД на Postgres. А Маркетинг, Продажники и Продуктовые команды хранят свои данные в сторонних системах вроде Google Ads, Salesforce и Segment. И вы скорее всего захотите централизовать все ваши данные в едином хранилище, например, на Google BigQuery.

В вашей компании уже используется Airflow, поэтому вы начинаете искать ETL операторы для выгрузки, преобразования и загрузки данных среди встроенных операторов и поставляемых пакетов. Когда вы не найдете нужных операторов для взаимодействия с вашими данными, вы возможно воспользуетесь Airflow Hooks для подключения к внешним системам. Например, не существует оператора для получения данных из Segment, но вы можете воспользоваться SegmentHook для обращения к API Segment.

Сначала у вас будет искушение настроить пайплайн, в котором вы выгружаете данные из Postgres в файловое хранилище, преобразуете его локально с помощью PythonOperator и далее загружаете результат в BigQuery. В итоге у вас получается примерно такое решение:

PostgresToGCSOperator (E) -> GCSToLocalFilesystemOperator (E) -> PythonOperator (T) -> LocalFilesystemToGCSOperator (L) -> GCSToBigQueryOperator (L)

Но насколько это хорошая практика на текущий момент?

Это традиционное ETL-решение, в котором логика преобразования выполняется между шагами выгрузки и загрузки данных. В Airflow можно преобразовывать данные локально (PythonOperator, BashOperator, ...), удаленно (SparkSubmitOperator…) или на стороне хранилища (PostgresOperator, BigQueryInsertJobOperator, ...).

В последние годы многие команды переходят из ETL- в ELT-парадигму. В парадигме ELT сначала данные загружаются как есть из источника в хранилище, а потом преобразовываются на стороне хранилища с помощью SQL. Одна из главных проблем ETL-пайплайнов заключается в том, что в них преобразование данных происходит в процессе передачи данных, из-за чего они работают менее стабильно.

С другой стороны современные хранилища данных могут автоматически масштабироваться при росте вычислительных нужд и объемов. А такие современные инструменты как dbt наделили SQL сверхспособностями. С dbt вы можете прикрутить в ваших SQL-скриптах макросы, циклы for и много чего еще. А самое главное, dbt отслеживает зависимости между таблицами, основываясь на их упоминаниях в SQL-коде. Когда же вы пишите преобразования на Airflow, вам нужно дублировать зависимости между таблицами и в SQL и в DAG.

SQL заменяет Python для преобразования и анализа данных в современном дата стеке.

Airflow ELT операторы

Вы можете воспользоваться операторами передачи вместе с операторами БД для ELT-пайплайнов. Но в Airflow нет оператора для прямой передачи данных из Postgres в BigQuery, поэтому требуется промежуточная выгрузка в Google Cloud. Например, вы можете выполнить PostgresToGCSOperator и потом GCSToBigQueryOperator. Далее вы можете выполнить преобразования в приемнике с помощью операторов BigQuery:

PostgresToGCSOperator (E) ->  GCSToBigQueryOperator (L) -> BigQueryInsertJobOperator (T)

Airflow предлагает массу вариантов для перегрузки данных из одной системы в другую. Но проблема в том, что чем больше у вас вариантов, тем выше шанс, что вы ошибетесь.

Airflow не является догматичным ETL-инструментом.

???? Плохой

Как мы увидели выше, полная перезагрузка данных с помощью Airflow реализуется достаточно просто. Но что произойдет, когда поменяется схема данных на источнике, и вы перезапустите DAG полной перезагрузки данных? Он упадет. Чтобы он заработал, вам нужно создать дополнительную задачу для удаления данных в приемнике. Это также гарантирует, что ваши DAG идемпотентны, и вы можете перезапускать их безо всяких побочных эффектов при их падении. Далее мы разберем как усложняется ситуация при попытке реализовать инкрементальную загрузку.

Инкрементальная загрузка данных в Airflow

Если вам нужно реализовать инкрементальную загрузку, в первую очередь нужно определиться с частотой запуска DAG. Далее вам нужно убедиться, что каждый DAG загружает только те данные, которые были созданы в период между датой выполнения текущего DAG и следующего. Для этого вам потребуется добавить в SQL-скрипты Airflow макросы `{{ds}}` и `{{next_ds}}` для текущей и следующей даты выполнения соответственно. Значения дат будут подставлены в рантайме.

Другая проблема, с которой вы можете столкнуться, это потребность в отдельнах DAG для полной перезагрузки данных и для инкрементальной загрузки, если у вас высокая частота запуска DAG. Представьте, что вы настроили ежечасное инкрементальное обновление данных, и у вас в источнике хранятся данные глубиной 5 лет. Тогда для полной перезагрузки всей истории потребуется произвести 43800 (5 лет * 365 дней * 24 часа) запусков DAG, что будет работать долго, так как Airflow вносит задержку между запусками DAG. Для решения этой проблемы обычно делают месячные, дневные и часовые DAG, в которых выполняется практически один и тот же код.

Как мы видим, не существует соглашения о том, как настраивать полную перегрузку и инкрементальное обновление для каждого оператора передачи. К тому же у операторов передачи разнятся интерфейсы, режимы синхронизации называются по-разному, и также разнятся подходы к маппингу данных из источника в приемники. В идеале в ETL-инструменте вы бы хотели только выбрать источник, приемник, режим синхронизации и набор необходимых колонок.

В других опенсорсных ETL-инструментах вроде Airbyte присутствует набор общих настроек для всех типов передачи данных. Например, Airbyte поддерживает четыре режима синхронизации: полная перезагрузка с перезаписью, полная перезагрузка с добавлением, инкремент с добавлением и инкремент с дедуплицированной историей.

???? Злой

Источники и приемники в Airflow тесно связаны

Каждый облачный провайдер включает операторы для передачи данных для большинства популярных систем в их облаке. В документации поставляемых пакетов можно найти список операторов и хуков, реализованных вне ядра Airflow. Вы можете посмотреть все 60 операторов передачи в этой табличке, собранной по источникам и приемникам. Ниже приведена хордовая диаграмма со всеми возможными потоками данных, реализуемыми операторами Airflow.

Визуализация всех потоков данных, реализуемых операторами Airflow.
Визуализация всех потоков данных, реализуемых операторами Airflow.

Картинка нарисована с помощью Datasmith Chord Diagram Generator.

Обратите внимание, что диаграмма не отображает направление передачи данных. Только в отдельных случаях вы можете передавать данные как в одну, так и в обратную сторону, как при использовании операторов GCSToS3Operator и S3ToGCSOperator. Также возможна передача данных в рамках одной и той же системы, как, например, в GCSToGCSOperator. Таким образом, для выгрузки данных из Google Ads в Snowflake вам придется использовать GoogleAdsToGCSOperator, потом GCSToS3Operator и наконец S3ToSnowflakeOperator.

Главная проблема операторов Airflow заключается в том, что для поддержки выгрузки данных из M источников в N приемников сообщество должно реализовать N x M операторов. Операторы сфокусированы на ограниченном количестве баз данных, хранилищ и озер. А что делать, если вашей компании понадобится синхронизировать данные в бизнес-приложениях? В Airflow не существует операторов для передачи данных из приложений, кроме Salesforce, Google Ads и Facebook Ads.

В других интеграционных инструментах вроде Airbyte источники и приемники не связаны, и используются коннекторы. Т.о. сообществу нужно реализовать всего лишь 2 * (N + M) коннекторов для поддержки всех возможных процессов передачи данных. Всего лишь за год в Airbyte появилась поддержка сотен коннекторов. И более 1000 типов передачи данных против шестидесяти, поддерживаемых в Airflow.

Операторы Airflow не смогут покрыть все ваши потребности в интеграциях в дальней перспективе.

???? Альтернатива

ELT-пайплайны на Airflow, Airbyte и dbt

Три опенсорсных продукта - Airflow, Airbyte и dbt - созданы для разных целей, но имеют много общего. Изначально Airflow это инструмент управления задачами, Airbyte это инструмент интеграции данных (шаги EL из ETL), а dbt это инструмент для преобразований (шаг T).

Как видно, Airflow можно использовать для ETL- и ELT-пайплайнов. Но на этом возможности не ограничиваются, можно пойти и другим путем. В Airbyte Open-Source и Airbyte Cloud также есть планировщик и интеграция с dbt для шагов преобразования.

Интерфейс Airbyte connection для настройки выгрузки данных из Postgres в BigQuery и применения кастомного dbt-преобразования данных
Интерфейс Airbyte connection для настройки выгрузки данных из Postgres в BigQuery и применения кастомного dbt-преобразования данных

Где настраивать расписание ETL- и ELT-пайплайнов?

Если у вас есть множество разнообразных пайплайнов, вы можете использовать Airflow в качестве планировщика для всех типов задач, включая пайплайны ETL/ELT. Далее вы можете интегрировать Airflow с Airbyte для запуска EL-шагов с помощью AirbyteTriggerSyncOperator. Таким образом вы можете запускать инкрементальное обновление из планировщика Airflow, и запускать полную перезагрузку из интерфейса Airbyte без задержки, которая присутствует в джобах перезагрузки в Airflow.

Также есть возможность интегрировать Airbyte с dbt для преобразования данных в каждом коннекторе, который загружает данные в приемник. Имейте в виду, что нужно ограничить использование dbt в Airbyte только простейшими преобразованиями данных в загружаемых таблицах. Если вам надо реализовать сложную логику преобразований, включающую данные из разных dbt-моделей, вам лучше использовать dbt в Airflow.

К сожалению, сообщество еще не договорилось о том как лучше интегрировать Airflow и dbt. Кто-то используется BashOperator, кто-то использует пакет airflow-dbt, разрабатываемый командой GoCardless, а кто-то использует dbt-cloud-plugin. Инженеры из Updater представили свои наработки по генерации графа dbt в Airflow, получаемого путем парсинга из файла dbt manifest.json.

Airflow DAG, полученный из файла dbt manifest.json
Airflow DAG, полученный из файла dbt manifest.json

Заключение

Airflow отлично смотрится как оркестратор задач. Так как он широко распространен, многие используют операторы передачи и преобразования данных для планирования и создания ETL и ELT пайплайнов. Мы рассмотрели какие могут возникнуть трудности в Airflow при создании DAG для полной перезагрузки данных и инкрементального обновления. Еще больше проблем с тем, что источники и приемники жестко связаны в операторах передачи. И из-за этого в Airflow невозможно покрыть все потребности в интеграциях.

Как вариант можно продолжать пользоваться Airflow в качестве планировщика и интегрировать его с двумя другими опенсорсными продуктами, которые лучше подходят для ELT пайплайнов - Airbyte для EL-шагов и dbt для T-шагов. В Airbyte источники отделены от приемников, поэтому вы можете выгружать данные из более чем ста источников (базы данных, API, ...) и загружать их в более чем десять приемников (базы данных, хранилища, озера, ...), и исключить использование бойлерплейта, встречающегося в Airflow. А с dbt вы можете преобразовывать данные в SQL-скриптах на стороне вашего хранилища данных и избежать поддержки зависимостей между таблицами в DAG Airflow.

Если вы хотите принять участие, ждем вас в слаке Airbyte, самом активном сообществе про интеграцию данных.

Комментарии (3)


  1. densol92
    13.10.2021 12:31

    M * N операторов это конечно звучит страшно, но прелесть AIrflow заключается в простоте реализации нового оператора. Плюс в 90% случаев хочется сохранить данные в промежуточном s3/gcs хранилище и уже оттуда можно лить в нужную базу, что сокращает число операторов до (M+N)*число_промежуточных_хранилищ.


  1. lotnikov
    13.10.2021 14:41

    Инкрементальную загрузку умеет делать только один ETL-инструмент: RAID от компании Mobileum. Все остальные барахтаются в парадигме "последней даты загрузки" и вылезти из этой песочницы никак не могут, результатом является необходимость отдельных пайплайнов для начальной загрузки и перезагрузки данных, что есть прошлый век.


    1. Vetal130
      22.10.2021 16:36

      А как RAID делает инкрементальную загрузку ?
      Спасибо