![](https://habrastorage.org/getpro/habr/upload_files/0c7/3f9/d10/0c73f9d1017706d80111c9192fa168b7.png)
Оркестрация процессов в Apache Airflow — мощный инструмент для управления данными. Но как внедрить его так, чтобы процессы стали прозрачными, гибкими и удобными для команд? Как организовать ETL-пайплайны, чтобы они легко масштабировались и адаптировались под нужды аналитиков, ML-инженеров и других специалистов?
Меня зовут Любовь Марисева, я инженер дата-платформы в Циан. Мы разрабатываем десятки ETL-процессов, обеспечиваем данными разные команды и постоянно ищем способы сделать работу с Airflow эффективнее. В этой статье я расскажу, как мы решили ключевые проблемы, сделали расчёты более управляемыми и упростили взаимодействие между командами.
Если вы только начинаете работать с Airflow или хотите улучшить свои подходы, присоединяйтесь! Эта статья поможет взглянуть на оркестрацию процессов под новым углом.
TL;DR
Для понимания сути статьи достаточно знать, что Apache Airflow — это платформа для создания, планирования и мониторинга рабочих процессов (или ETL-конвейеров), организованных в виде DAG-ов (или графов). DAG — это направленный ациклический граф, состоящий из задач, которые выполняют код и связаны между собой логикой выполнения.
Пример простого DAG: он состоит из четырёх задач — extract, transform, sending_metrics и load. В графе определён порядок их выполнения: сначала исполняется extract, затем transform и sending_metrics, а завершает работу задача load. Когда все задачи выполнены, процесс загрузки данных считается успешным.
![](https://habrastorage.org/getpro/habr/upload_files/ccf/a3c/a67/ccfa3ca6712dbcc80932bbe942302058.jpg)
Airflow предоставляет удобный UI, где можно визуализировать DAG, просматривать логи выполнения, статус запуска, параметры и многое другое.
Давайте на примере готового проекта разберём, как его можно перенести на Airflow и какую пользу это принесёт. Предположим, у нас есть проект, включающий несколько этапов работы с данными: подключение к API для получения данных, их сохранение в базу, расчёты витрин и запуск обучения ML-модели. По ходу выполнения также происходит экспорт данных аналитикам. Особенность проекта в том, что за выгрузку данных, расчёты и обучение модели отвечают три разные команды.
Перенос проекта на Airflow позволит чётко обозначить задачи и зависимости между ними. Это решение даст возможность перезапускать отдельные этапы проекта, а работа с кодом станет проще: можно будет добавлять новые задачи, не затрагивая общий код, и изменять существующие, не влияя на соседние задачи. Кроме того, архитектура проекта станет более наглядной благодаря визуализации в UI.
Часть 1: Один DAG и мир внутри него
Обычно DAG представляет собой граф, реализующий единый рабочий процесс. Внутри графа множество задач-вершин, представленных операторами, а зависимости между ними определяются инженером и выступают в роли рёбер. Построить граф расчёта можно как в виде линейной цепочки, так и по принципу «один ко многим» или «многие к одному». Кроме того, доступны более сложные структуры, например, ветвления или условные задачи.
FYI: При разбиении процесса на задачи важно убедиться, что каждая из них выполняет условие идемпотентности, то есть может повторно выполняться с одинаковым результатом. Это особенно важно при необходимости перезапуска, если что-то пошло не так во время выполнения процесса.
Для построения архитектуры графа в Airflow есть два основных инструмента: настройка цепочек зависимостей и использование триггеров. Первый способ достаточно прост: нужно определить, от каких задач зависит текущая, и добавить эти связи в описание DAG.
![](https://habrastorage.org/getpro/habr/upload_files/dfb/78b/326/dfb78b32676f6628479f6bd0c4c93086.png)
Чтобы привести проект к виду связанного графа, сначала определим ключевые задачи в пайплайне и их взаимосвязи. В рассматриваемом примере уже выделены этапы выполнения программы, которые станут отдельными задачами в графе:
загрузка данных из API;
обработка полученных данных (расчёт таблиц);
экспорт данных;
обучение ML-модели.
На данном этапе задачи выстроены в линейной последовательности: каждая задача запускается после успешного завершения предыдущей. Если, например, на этапе экспорта данных произойдёт сбой, вам не придётся повторно запускать уже успешно выполненные задачи, например, расчёт таблиц. Достаточно будет перезапустить экспорт, а Airflow автоматически запустит обучение модели, так как оно зависит от успешного выполнения экспорта.
![](https://habrastorage.org/getpro/habr/upload_files/866/b0c/9b1/866b0c9b1be18d6018b76df9b4564a0a.png)
Линейная структура не всегда оптимальна, так как не все задачи зависят друг от друга и могут выполняться параллельно. В нашем примере обучение модели с точки зрения разработки не зависит от экспорта данных, но напрямую связано с этапом обработки данных. Это можно отобразить в графе, изменив связи между задачами.
Теперь представим, что проект развивается, и появляются новые требования, такие как:
добавление витрины;
создание нового хранилища для экспорта;
запуск нового функционала, например, тестирование;
мониторинг ошибок с последующей отправкой уведомлений или флагов.
Можно попытаться скомпоновать все эти изменения, сохраняя связи только внутри графа, но гораздо удобнее воспользоваться таким инструментом, как правила триггеров, которые определяют, при каких условиях необходимо запустить задачу. Планировщик Airflow проверяет каждую задачу, и если условие триггера выполнено, она считается готовой к запуску.
В Airflow существует несколько типов триггеров, позволяющих гибко управлять запуском задач:
![](https://habrastorage.org/getpro/habr/upload_files/686/475/225/686475225468ff36d0a3b34bfb61617e.png)
all_success — правило по умолчанию. Задача запускается, если все предыдущие задачи завершены со статусом успеха. В нашем проекте для запуска обучения модели необходимо, чтобы все зависимые задачи, такие как расчёты таблиц, завершились успешно.
one_failed — запуск происходит, если хотя бы одна из зависимых задач завершилась неуспешно. Например, задача отправки сигнала об ошибке экспорта будет запущена, если хотя бы один из экспортов не удался.
all_failed — задача запускается, если все предыдущие задачи завершены со статусом неудачи.
all_done — задача будет запущена после завершения всех предыдущих задач, независимо от их статуса (успех, неудача или пропуск).
all_skipped, one_success, one_done и другие — существует множество других триггеров, которые можно детально изучить в документации.
Настроив цепочку задач, возникает логичный вопрос: как их всё-таки запустить?
Возможности запуска DAG-а и как их можно настраивать
Запуск вручную
Запустить DAG можно вручную через CLI или UI. Это удобно для отладки, когда нужно оперативно перезапустить упавший расчёт или протестировать новые изменения. Но полагаться на ручной запуск в проде — не лучшая идея. Ведь вставать в три часа ночи, чтобы перезапустить задачу, — удовольствие сомнительное.
![](https://habrastorage.org/getpro/habr/upload_files/2b4/349/ae8/2b4349ae8e978a45d3058c321ba21a86.png)
Запуск по расписанию
В Airflow можно настроить автоматический запуск DAG-а через определенные промежутки времени: раз в час, день или месяц. Это можно задать через параметр schedule_interval. Также можно воспользоваться более гибким cron-расписанием для тонкой настройки запуска и спокойно отдыхать, зная, что процессы запустятся вовремя.
![](https://habrastorage.org/getpro/habr/upload_files/7e7/3ec/a26/7e73eca261bf7ea920a062fc98723bcc.png)
Запуск по обновлению датасета
Если нужно более гибкое решение, когда DAG должен ждать поступления данных, можно использовать запуск по обновлению датасета. Например, если у вас есть задачи по агрегации данных из разных источников, а обработка может занимать неопределённое время, DAG будет запущен автоматически после получения необходимых данных. Это предотвращает ситуацию, когда DAG падает, не дождавшись обновления данных при запуске по расписанию.
![](https://habrastorage.org/getpro/habr/upload_files/4af/898/6bd/4af8986bdaf09ef8b13f6a85e54b42e9.png)
Запуск по триггеру
Airflow позволяет создать пользовательский оператор-сенсор, который будет ожидать выполнения определённого условия и запускать DAG при его выполнении. Например, завершение одного DAG-а может автоматически триггерить запуск другого. Об этом поговорим подробнее во второй части статьи.
![](https://habrastorage.org/getpro/habr/upload_files/27e/768/236/27e768236bf9d5c43c8bb79a927999be.jpeg)
Результаты переноса проекта в Airflow
После переноса нашего большого проекта по обработке данных в Airflow, разбиения его на отдельные задачи и установления зависимостей между ними, мы получили следующие преимущества:
Упрощение понимания проекта. Визуализированный граф задач позволяет быстро оценить структуру проекта, его этапы и взаимосвязи.
Ускорение разработки. Разбиение на отдельные задачи позволяет быстрее добавлять новые элементы, клонируя существующие задачи и добавляя в них новую логику.
Гибкое добавление новых функций. Новые задачи можно вводить поэтапно: сначала тестировать и минимизировать риски, а затем интегрировать в общий граф расчёта.
Снижение затрат времени при сбоях. В случае падения расчёта перезапускать нужно только неотработавшие задачи, не прогоняя весь процесс заново. Это существенно экономит время.
Однако наш проект продолжает расти: новые источники данных увеличивают разброс времени готовности расчётов, а конкуренция между командами за деплой и тестовые запуски усложняет процессы. В следующей части разберём, как разделение одного DAG-а на несколько независимых графов помогает справляться с этими вызовами.
Часть 2: Несколько DAG-ов
Объединяя процессы в один DAG или разбивая большой процесс на атомарные операции внутри него, мы получаем возможность видеть зависимости между задачами, настраивать их запуск и перезапуск, а также просматривать параметры в одном месте. Однако включать все процессы в один пайплайн не всегда целесообразно — зачастую лучше разделить их на отдельные графы. Вот несколько причин для этого:
Разные источники данных. Если в одном DAG-е обрабатываются данные из разных источников с различным временем поступления, часть DAG-а может отработать сразу, а остальная будет ждать, пока поступят другие данные. В таком случае имеет смысл разделить графы, чтобы каждый запускался по своему расписанию.
Разные команды разработки. Когда за разные задачи отвечают разные команды, объединение всех задач в один граф усложняет управление и может вызвать путаницу. Разделение на отдельные графы упрощает работу каждой команды.
Цикличность в графе. Если внутри одного DAG-а появляется цикличность, её стоит вынести в отдельный граф, чтобы избежать сложностей в работе.
![](https://habrastorage.org/getpro/habr/upload_files/8ce/827/fc6/8ce827fc66c9179d04751bb72fcc216d.png)
Теперь вернёмся к проекту, который рассматривали ранее, и оценим его текущее состояние:
за загрузку и агрегацию данных отвечает команда дата-инженеров (DE);
за обучение и работу с ML-моделями — команда ML;
за построение витрин — команда аналитиков.
Разобьём этот единый граф на несколько, исходя из команд, которые занимаются разработкой.
![](https://habrastorage.org/getpro/habr/upload_files/f37/e2a/c21/f37e2ac21a060fdfe1ac80564b7a8f1f.png)
Пример разбивки графа по расписанию
Рассмотрим граф обработки данных. Данные из одного источника поступают раз в день, например, в 01:00, и в это время наш DAG должен запуститься, чтобы забрать и обработать их. Однако другой источник может предоставлять данные несколько раз в день, например, в 04:00 и 16:00. Одного расписания для одного DAG-а недостаточно, чтобы запускать его с разной периодичностью. Конечно, можно написать дополнительный код для проверки наличия данных, но это усложнит логику и создаст потенциальные ошибки.
Лучшим решением будет разделить один граф на несколько, каждый из которых будет запускаться по своему расписанию.
![](https://habrastorage.org/getpro/habr/upload_files/e50/32d/6bf/e5032d6bf40fd32be9f9ecca9d7c7343.png)
В airflow графы выглядят так:
![](https://habrastorage.org/getpro/habr/upload_files/87a/35e/3d9/87a35e3d97b8ce485d6a0a0b648861f5.jpg)
После разделения графов всё выглядит гораздо лучше: расписания соответствуют времени поступления данных, а команды разработки работают в своих DAG-ах, не мешая друг другу. Однако остаётся нюанс: процессы обучения моделей и расчёты аналитиков всё ещё зависят от данных, поступающих от дата-инженеров. Эта связь не отображена в Airflow и не настроена внутри проектов.
Теперь разберём, как графы сторонних процессов могут узнавать, что необходимые им данные готовы и они могут запускаться. Для этого существуют несколько механизмов, которые мы рассмотрим через интерфейс Airflow UI:
1. Расписание
Команды могут договориться, что данные будут готовы к полуночи, и тогда графу обучения не нужно знать о состоянии ETL-графа — достаточно просто запускаться в определённое время. Этот метод не отображает связь в Airflow, что неудобно и может привести к проблемам, если расписание одной из задач изменится или информация о зависимости между графами будет утеряна.
2. Обмен флагами
После завершения записи данных в хранилище можно добавить флаг об успешном завершении. Этот механизм позволяет потребителям данных запускать свои процессы, ориентируясь на наличие флага. Однако это не встроенное решение Airflow. С одной стороны, оно позволяет интегрировать расчёты, происходящие на других платформах, но требует периодической проверки флага в коде проектов и не отображается в Airflow.
3. TriggerDagRunOperator
Этот оператор отправляет сигнал запуска из одного графа в другой. Например, ETL-граф может запускать граф обучения после завершения своих задач. Также этот метод можно использовать для запуска других графов. Он решает задачу своевременного запуска зависимых графов и позволяет отслеживать связи в разделе DAG Dependencies. Однако при добавлении нового графа потребуется вручную прописывать зависимость в коде.
4. ExternalTaskSensor
Этот оператор, похожий на TriggerDagRunOperator, позволяет зависеть от нескольких задач в разных DAG-ах и запускать часть задач, пока другие остаются на паузе. Он также отображается в DAG Dependencies. Разница в том, что зависимость прописывается в графе-потребителе, а не в графе-поставщике данных. В нашем примере команде DE не нужно вносить изменения, а командам-потребителям достаточно добавить зависимость от графа-поставщика в своих проектах. Мы используем сенсор, чтобы обучение модели запускалось после успешного завершения всех задач в ETL-графе.
![](https://habrastorage.org/getpro/habr/upload_files/2f2/ab0/50b/2f2ab050b1ecb15562f3761371971b73.jpg)
5. Dataset
Airflow поддерживает концепцию датасетов — групп данных. Если задачи графа обновляют определённые данные (например, таблицу в базе), можно указать флаг об обновлении датасета. Граф, выполняющий расчёты, считается поставщиком данных, а любой граф, использующий этот датасет, — потребителем. Планировщик запустит граф-потребитель, как только данные будут обновлены. Это гибкий инструмент, позволяющий командам взаимодействовать на основе данных без необходимости вносить изменения в код. Однако датасеты не подходят, если нужно триггерить графы на основе завершения задач, а не обновления данных.
Используя Dataset, можно связать граф аналитики и граф обработки данных так, чтобы граф аналитики запускался при обновлении датасета в ETL-графе.
![](https://habrastorage.org/getpro/habr/upload_files/913/d65/feb/913d65feb3dd07d5156bc56e60b6482d.jpg)
6. Обмен сообщениями
Airflow поддерживает обмен сообщениями с помощью XCom и API. Задачи могут передавать данные с помощью xcom_push и xcom_pull, передавая флаги для запуска других DAG-ов. Этот метод создаёт неявные зависимости, что со временем может усложнить поддержку и масштабирование системы.
Используя эти инструменты, можно гибко преобразовать проект из большого монолита в несколько связанных сущностей в Airflow.
![](https://habrastorage.org/getpro/habr/upload_files/e35/a1f/fc6/e35a1ffc6b410fd210a40a4d9c7617e1.png)
Заключение
В этой статье мы разобрали, как организовать и управлять рабочими процессами с использованием DAG-ов в Apache Airflow, а также рассмотрели преимущества разделения сложных процессов на несколько независимых графов.
Основные выводы и рекомендации:
Использование DAG-ов. DAG-и позволяют структурировать задачи в виде графов, делая процессы более прозрачными и понятными. Это значительно упрощает понимание структуры проекта и взаимосвязей между задачами.
Разбиение на задачи. Деление проекта на отдельные задачи с использованием типовых шаблонов ускоряет разработку, облегчает добавление новых функций и позволяет перезапускать отдельные части пайплайна без необходимости повторного выполнения всех задач. Это экономит время и ресурсы.
Оптимизация расписаний. Грамотный выбор инструментов для планирования расчётов помогает избежать чрезмерного усложнения логики обработки данных и повышает эффективность процессов.
Разделение графов по командам. Чёткое распределение ответственности между командами за разные DAG-и снижает вероятность конфликтов, делает разработку более гибкой и упрощает масштабирование. Это также позволяет легче адаптироваться к изменениям в требованиях.
Применение этих подходов в Airflow помогает сделать управление данными более структурированным и гибким, что особенно важно в условиях активного роста системы.