Многие команды сейчас меняют проприетарное ПО на открытые аналоги. Под «открытостью» мы понимаем не только миграцию с платного софта на бесплатный, но и новый подход к построению data-платформ, где каждый продукт развивает свой сегмент платформы с помощью релевантного стека технологий.
Полноценных open-source-альтернатив, которые закрыли бы все наши потребности, не нашлось. Поэтому мы решили создать свой «мультитул» — low-code-фреймворк для генерации гетерогенных Airflow DAG с незамысловатым названием dag_generator.
Цель этой статьи — поделиться опытом внедрения подобного инструмента. Генерация выполняется по старинке, так что про ИИ здесь пока ничего не найдете.
Идея и задача
Наш генератор задумывался как универсальное средство, позволяющее бизнесу максимально быстро «влететь» в платформу и выводить продукт в production в конвейерном режиме.
Здесь расскажем о принципах работы фреймворка и его инфраструктуре. Углубляться в плюсы и минусы DDD и DataMesh не будем, но отметим: именно подход DataMesh к разделению платформы на домены и продукты стал для нас отправной точкой.
Ключевые цели
Типовые операции — без кода.
Сквозная валидация конфигураций.
Конвейерная поставка, которая ускоряет внедрение и сокращает TTM.
Оркестратором на платформе выбрали Apache Airflow. Однако архитектура dag_generator достаточно проста, чтобы при необходимости мигрировать на Dagster, Prefect, Luigi или другие аналоги.
По задумке фреймворк должен был решать ETL-задачи, запускать модели, обращаться в другие системы и быть легким и тиражируемым компонентом платформы.
Архитектура фреймворка
В первую очередь фреймворк должен быть универсальным простым! Но задачи на платформе разнородные и сложные. Значит — надо декомпозировать.

dag_generator состоит из двух логических частей.
Ядро — Python-приложение с логикой вызова из CLI, Jinja2-шаблонами для генерации DAG-файлов, модулями генерации и валидации.
Джобы — отдельные Python-приложения, каждое из которых содержит YAML-схему своих атрибутов и реализацию логики под конкретную задачу. Например, построить DAG по SQL-скрипту или dbt-проекту, запустить KubernetesPodOperator с нужными параметрами или просто отправить email.
Джоб
Джоб — самостоятельное приложение со своим репозиторием, CI/CD и релизным циклом.
Для создания очередного джоба нужно всего три шага:
отнаследоваться от базовой модели джоба dag_generator;
описать атрибуты джоба;
написать реализацию, которая возвращает Operator`ы, чтобы Airflow смог построить дерево задач в DAG.
Типовая схема джоба по отправке email:
from dag_generator.framework.yaml_model import BaseDagJob, gen_metadata
from marshmallow import fields, validate
class EmailJob(BaseDagJob):
job_name = fields.String(required=True, validate=validate.NoneOf(("EmailSend")),
metadata=gen_metadata(1, "JOB_NAME", "Наименование джоба (обязательный параметр)"))
job_type = fields.String(required=True, validate=validate.Equal("email_job"),
metadata=gen_metadata(2, "email_job", "Тип джоба (обязательный параметр)"))
email_to = fields.String(required=True, allow_none=False,
metadata=gen_metadata(3, "example@mail.com", "Список получателей (обязательный параметр)"))
subject = fields.String(required=True,
metadata=gen_metadata(4, "Airflow test subject", "Тема письма (обязательный параметр)"))
body = fields.String(required=True,
metadata=gen_metadata(5, "Airflow test body", "Тело письма (HTML) (обязательный параметр).))
attachment_path = fields.String(required=False,
metadata=gen_metadata(6, "/tmp/test.txt", "Путь до вложения (необязательный параметр)"))
Реализация отправки:
from typing import Dict
import os
from airflow.operators.email import EmailOperator
from dag_generator.utils.email_notifications.email_alert import DagStatusAlert
def EmailSend(config: Dict):
job_name = config.get("job_name", "email_job")
email_to = config.get("email_to")
subject = config.get("subject")
body = config.get("body")
attachment_path = config.get("attachment_path")
email_to = DagStatusAlert.validate_emails(email_to)
if not email_to:
raise ValueError("Не найдено корректных email адресов для отправки")
if attachment_path and os.path.isfile(attachment_path):
files=[attachment_path]
else:
files = None
email_job = EmailOperator(
task_id = job_name,
to=email_to,
subject=subject,
html_content=body,
files=files
)
email_job
Подключение джобов из python packages в качестве provider
Подгружая реализацию джобов в dag_generator, мы используем ту же концепцию, что и Airflow при подгрузке провайдеров.
Все джобы dag_generator имеют такую же структуру проекта и настройки pyproject.toml, как родительское ядро:

Благодаря этому джобы при установке попадают в ту же директорию и становятся доступны для импорта. При запуске dag_generator ищет файлы job.yaml и подхватывает реализацию с конфигурацией:
def _discover_all_dag_generator_jobs(self) -> None:
"""
Поиск всех установленных джобов в dag_generator.
Она ищет файлы job.yaml и регистрирует джобы, объявленные в них.
"""
try:
import dag_generator.jobs
except ImportError:
logger.info("You have no jobs installed.")
return
seen = set()
jobs_path = os.path.dirname(dag_generator.jobs.__file__)
for path in os.listdir(jobs_path):
try:
path = os.path.join(jobs_path, path)
if path not in seen:
seen.add(path)
self._add_jobs_info_from_files_on_path(path)
except Exception as e:
logger.warning(f"Error when loading 'job.yaml' files from {path} airflow sources: {e}")
Далее все джобы доступны для импорта по одинаковому пути. Вот пример импорта sql_parser_job джоба из ядра dag_generator:
from dag_generator.jobs.sql_parser.realization.sql_parser_job import run_sql_parser_job
Принцип работы
Базовый сценарий использования dag_generator выглядит так (в роли пользователя может выступать приложение):

Генерация шаблона: пользователь создает YAML-шаблон DAG с джобами и заполняет его значениями.
Генерация DAG: пользователь запускает генерацию DAG-файла, указывая заполненный YAML. Происходит его валидация.
Git: полученный DAG помещается в git-репозиторий.
CI: Webhook запускает фазу CI с тестированием и предварительной подготовкой артефактов.
CD: Запускается фаза CD, которая доставляет DAG и артефакты в среду исполнения Airflow.
Клиентская часть
В клиентской части dag_generator делает три вещи: генерирует YAML-шаблоны по схемам marshmallow, валидирует заполненные пользователем YAML и создает на их основе DAG-файлы.
Генерация YAML-файла
Пользователь заполняет YAML-файл (который тоже можно сгенерировать) и запускает создание DAG.
YAML-спецификация на примере SQL-джоба:
# Dag generator version: 1.4.1
dag_id: sql_parser_job_<replace_with_your_name> # Наименование дага (обязательный параметр). Должен быть уникальным в рамках одного Airflow.
start_date: '2025-11-19' # Дата начала в формате YYYY-MM-DD (обязательный параметр).
schedule: '@once' # Расписание дага (обязательный параметр, но можно в значении указать null)
catchup: false # Флаг восприятия параметра start_date как времени фактического запуска дага (обязательный параметр).
domain_name: DOMAIN_NAME # Имя домена. (обязательный параметр).
jobs: # Список джобов. Могут комбинироваться различные типы джобов (обязательный параметр)
- job_name: JOB_NAME # Наименование джоба (обязательный параметр)
job_type: sql_parser_job # Тип джоба
filepath: ./test/test.sql # Путь до корректного sql файла на сервере, который будет преобразован в даг. Если указан параметр sql_folder_variable, путь должен быть относительно папки (обязательный параметр)
connection_name: connection_id # Глобальный ID коннектора в Airflow через который будут выполняться все запросы, если для конкретного запроса не указан локальный ID коннектора (обязательный параметр)
Генерация DAG-файла
Генератор валидирует введенные значения и, если все хорошо, с помощью Jinja-шаблона создает DAG-файл.
Пример сгенерированного DAG по YAML выше:
# dag_generator version: 1.4.1
import pendulum
import os
from airflow.decorators import dag, task_group
from airflow.utils.session import provide_session
from airflow.operators.empty import EmptyOperator
@dag(
dag_id=os.path.basename(__file__).replace('.pyc','').replace('.py',''),
start_date=pendulum.parse('2025-11-14', tz='Europe/Moscow'),
schedule='@once',
catchup=False,
params={}
)
def create_dag():
from dag_generator.jobs.sql_parser.realization.sql_parser_job import run_sql_parser_job
sql_parser_demo_job_config = {'autocommit': False,
'connection_name': 'my_airflow_db_conn',
'filepath': '/path_to_sql_file_on_airflow_server',
'job_name': 'sql_parser_demo_job',
'job_type': 'sql_parser_job',
}
@task_group(group_id='sql_parser_demo_job', tooltip='')
def sql_parser_demo_job(config):
run_sql_parser_job(config)
sql_parser_demo_job = sql_parser_demo_job(sql_parser_demo_job_config)
start = EmptyOperator(task_id="start")
done = EmptyOperator(task_id="end")
start >> sql_parser_demo_job >> done
create_dag()

DAG у нас получился динамический, и это неслучайно. В начале разработки мы выбирали между двумя подходами (спойлер: остановились на втором).
Подход 1. Статические DAG`и: генерация кода каждого оператора.
Плюс: шедулеру не нужно выполнять динамическое построение в момент парсинга.
Но минусов больше. Сгенерировать в памяти DAG и Operator — нативная операция Airflow, а вот сгенерировать код по декларированию этих объектов — уже совсем другая история. DAG`и «раздуваются» и плохо читаются, появляется много бойлерплейта. А при изменении порядка, состава или сигнатуры операторов приходится перегенерировать и заново пушить DAG в репозиторий.
Подход 2. Динамические DAG`и: в коде декларируются только группы задач и их конфигурация
Дерево задач строится шедулером Airflow при парсинге DAG-файлов. Есть накладные расходы на парсинг и планирование. Зато получаем гибкость и разделение ответственности: DAG не нужно перегенерировать при правках в его артефактах. Например, SQL-скрипт, по которому sql_parser-джоб строит дерево задач, может дорабатываться независимо от DAG`а в отдельном процессе и репозитории. А CI/CD-pipeline уже доставит этот прототип и вызывающий его DAG на файловую систему под Airflow. Ну, и код DAG получается лаконичным и читаемым.
Для наших процессов этот вариант подошел лучше.
Генерация и валидация: преимущества marshmallow (почему не Pydantic?)
Marshmallow обладает для нас двумя ключевыми преимуществами.
Генерация и валидация в одном флаконе
В отличие от похожих библиотек, marshmallow при инициализации схемы не требует данных для валидации. Одну и ту же модель атрибутов джоба можно использовать для двух независимых задач:
генерация YAML-шаблона — модель, основываясь на своих метаданных, выгружает структуру джоба в шаблонный YAML-файл с комментариями;
валидация заполненного YAML — та же самая модель проверяет заполненный пользователем YAML на соответствие типам и бизнес-правилам.
Вспомогательные метаданные
В marshmallow-схеме есть extra-поле metadata для описательной части. Мы прописываем в нем порядковый номер атрибута в сгенерированном YAML, дефолтное значение и комментарий для пользователя. Заполненный YAML-файл сериализуется в JSON и передается как config в DAG для построения дерева задач.
Пример валидации атрибутов SQL-джоба с metadata-полями:
from marshmallow import fields, validate
class SqlFileDescription(YamlGenerator):
filepath = fields.String(required=True,
metadata=gen_metadata(1, "./test/test.sql", "Путь до корректного sql файла на сервере, который будет преобразован в даг. Если указан параметр sql_folder_variable, путь должен быть относительно папки (обязательный параметр)"))
sql_dialect = fields.String(required=False, validate=validate.OneOf(SQL_LINEAGE_DIALECTS), load_default="ansi",
metadata=gen_metadata(2, "impala", "Название диалекта sql, нужен для парсинга lineage. По умолчанию ansi. (необязательный параметр)"))
Серверная часть
На Airflow-сервере dag_generator отвечает за оркестрацию описанных джобов и построение дерева задач (Tasks) в Airflow. Именно здесь конфиг из YAML десериализуется, и джобы создают соответствующие Operator`ы.
Основные джобы
sql_parser
Этот джоб принимает на вход SQL-скрипт для построения витрин данных и решает несколько задач.
Распределение запросов по задачам Airflow
Запросы в SQL-скрипте — это набор инструкций с разделителем. Задача парсера — распределить эти инструкции по задачам, указав параметры на SQL-сессию.
Сценарии распределения
Объединение нескольких запросов в одну Airflow-задачу. Полезно, когда запросы легковесные или тесно связаны между собой.
Разбиение одного тяжелого запроса на несколько задач. Полезно, когда надо посчитать большой объем по частям, не меняя сам запрос.
Запуск n нижеследующих запросов параллельно.
Один запрос в одну задачу — поведение по умолчанию.
Указание параметров сессии без нарушения «чистого» SQL
Мы добавили DSL с инструкциями для парсера в виде блочных комментариев. Так можно задавать логику распределения и конфигурацию (например, connection к БД, объем памяти, алгоритм сжатия, формат хранения), не ломая синтаксис SQL. Любой фрагмент или весь скрипт по-прежнему можно запустить в SQL-клиенте или IDE.
Аналитики, тестировщики и дата-инженеры работают с одним и тем же артефактом.
Формирование Data Lineage
С помощью библиотеки sqllineage мы автоматически извлекаем данные о lineage и отправляем их в централизованное хранилище.
Отслеживание фрагментов SQL-скрипта в интерфейсе Airflow
Сопровождать проще, когда все прозрачно. В интерфейсе видно содержимое конкретной задачи: из DSL-хинтов была проставлена память на сессию, а запросы drop, create и insert into объединены, чтобы избежать лишних connect/disconnect.

Использование переменных в запросах
Пользователи ссылаются в SQL-скрипте на переменные, указав перед ними спецсимвол (например, :schema_name). Это может быть имя схемы, дата в where-предикате, количество памяти и т. п. IDE поддерживают этот функционал, поэтому валидность SQL-скрипта не нарушается.
Парсер заменяет эти переменные на Airflow variables, dag params или xcom в Jinja-нотации.
Переменные в SQL-скрипте:
SELECT c1 FROM :my_work_schema.t1 WHERE DTTM >= :increment_date
Переменные после парсинга и генерации:
SELECT c1 FROM {{ dag_run.conf.get('my_work_schema', 'default_schema_name') }}
WHERE DTTM >= {{ dag_run.conf.get('increment_date_value', 'default_dttm_value') }}
Пример DSL-разметки
Рассмотрим пример DSL-разметки фрагмента SQL-скрипта для Apache Impala:
/*
{
"pre_query_sql_settings": [
"SET mem_limit = 1G"
],
"split_query": {
"split_column": "agreement_id_int",
"num_chunks": 3,
"num_threads": 3
}
}
*/
INSERT INTO TABLE fl_test.gen_agreement
SELECT
col1
,...
FROM very_big_table as t
;
/*
{
"op_name": "fl_test_gen_agreement_compute_stats"
}
*/
compute stats fl_test.gen_agreement;
В результате парсинга мы получим следующую картину в DAG`е Airflow:

В хинтах мы указали:
память на сессию для выполнения запроса;
разбивка расчета входного объема данных на три бакета (операция выполняется через остаток от деления int-ключа таблицы на n);
количество параллельных сессий, в которые нужно посчитать исходный запрос.
Парсер создал таск-группу, в которой посчитал три промежуточных объекта и залил из них данные в целевую таблицу, предварительно обнулив ее. В хинтах также можно задать ожидание успешного завершения всех параллельных тасок или же залить данные асинхронно.
В конце считаем статистику отдельной задачей.
dbt
dbt — популярный инструмент трансформации данных, позволяющий строить витрины данных с использованием шаблонизированного кода в парадигме командной разработки ПО.
dbt-проект состоит из множества «моделей», между которыми dbt автоматически строит граф зависимостей благодаря макроинструкциям в моделях. Но из коробки его нельзя разложить в дерево задач DAG`а Airflow. Отсюда и необходимость нашего джоба.
Изначально мы пошли по обманчиво легкому пути — раскладывали dbt-модели на задачи через BashOperator. Однако с ростом проектов (~300+ моделей) мы столкнулись с плавающими ошибками, конфликтами доступа к manifest.json, рекурсивными зависимостями и костылями предварительной обработки в CI/CD.
В итоге мы перешли на готовую библиотеку cosmos от Astronomer.
Она предоставляет из коробки:
построение графа зависимостей моделей в DAG;
компиляцию dbt-проекта разными способами (dbt ls, dbt parse и т. д.);
ExecutionMode для разных сценариев (LOCAL, DOCKER, KUBERNETES);
DbtDag с управляемыми встроенными тестами;
режим запуска моделей в venv;
управление параллельностью выполнения через threads;
запуск подмножества моделей по тегам в одной задаче.
Пример сгенерированного DAG-а с dbt-джобом на основе cosmos:
import pendulum
import os
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(
dag_id=os.path.basename(__file__).replace('.pyc','').replace('.py',''),
start_date=pendulum.parse('2025-04-22', tz='Europe/Moscow'),
schedule=[Dataset('dag://dbt_parent_dataset')],
catchup=False,
max_active_runs=30
)
def create_dag():
from dag_generator.jobs.dbt_loader.dbt_single_task.realization import run_single_task
big_dbt_cosmos_job_config = {'exclude': ['tag:view'],
'job_name': 'big_dbt_cosmos_job',
'job_type': 'dbt.model',
'pool': 'dbt',
'profile': {'airflow_conn_id': 'my_conn',
'name': 'my_name',
'schema': 'some_schema',
'store_type': 'db_type',
'target': 'dbt_target'},
'project': '/my_dbt_project',
'retries': 1,
'select': ['tag:my_tags']
}
big_dbt_cosmos_job = run_model(big_dbt_cosmos_job_config)
start = EmptyOperator(task_id="start")
done = EmptyOperator(task_id="done", trigger_rule="all_done", outlets=[Dataset(f"dag://{start.dag_id}/done")])
start >> dbt_kno_mark_cdl_ss_cdl_job >> done
create_dag()

Представление DAG в Airflow
Под высокой нагрузкой
«Разгрузка» DAG`ов
Большой dbt-проект в интерфейсе Airflow — зрелище не для слабонервных. Да и сам Airflow страдает: Scheduler и Webserver начинают задыхаться.
Первое, что рекомендуется сделать, — начать дробить проект на отдельные DAG-и. Делить проект можно по тегам, которыми размечаются модели. Зависимости между такими DAG-ами удобно выстраивать через dataset-ы. Также можно отправлять DAGи в разные ресурсные пулы Airflow.
Агрегация задач
Второе, что мы сделали, — отдельный джоб с группировкой нескольких dbt-моделей в одну Airflow-задачу. Используя dynamic task mapping, мы распределяем батчи dbt-моделей по задачам.
Логика распределения определяется тегами и --select-командой. Таким образом, одна задача запускает подмножество моделей согласно dbt run --select — команде с одним из указанных пользователем тегов.
Фрагмент реализации dbt-джоба с группировкой по --select-правилам:
run_dbt = DbtRunOperator.partial(
task_id=f"cosmos_{config['job_name']}",
project_dir=str(PROJECT),
profile_config=profile_config,
exclude=exclude,
install_deps=install_dbt_deps,
emit_datasets=False,
env=ENVS,
invocation_mode=InvocationMode.SUBPROCESS,
dbt_executable_path="dbt",
trigger_rule=config.get("trigger_rule", "all_success"),
do_xcom_push=config.get("do_xcom_push", False),
owner="URISAD_NDP_ETL",
**operator_args,
).expand(select=select)
return run_dbt
На такой DAG уже гораздо приятнее смотреть:

kubernetes_job
Этот джоб — обертка вокруг KubernetesPodOperator.
Пользователю не нужно расписывать параметры kubernetes.client.model-объектов — достаточно указать имя образа, путь к приложению и аргументы. Если нужно — можно добавить requests и limits.
В этом джобе удобно запускать обособленные приложения (например, Python-модели, Java-вычисления и т. п.).
Поскольку наш Airflow развернут в Kubernetes, мы можем переиспользовать готовый шаблон пода для воркеров, применяя его к нашему оператору. Не нужно заново описывать тома, порты, секреты и прочее.
Делается это нехитрой функцией:
from kubernetes import config
from kubernetes.client import CoreV1Api
from kubernetes.client.models import V1Pod
def get_pod_template(
cm_name: str = "airflow-pod-template",
namespace: str = os.getenv("AIRFLOW__KUBERNETES__NAMESPACE", "default"),
template_key: str = "template.yaml",
exit_on_fail: bool = True,
**kwargs
) -> V1Pod:
"""
Function, that creates V1Pod class from ConfigMap
:param str cm_name: name of the ConfigMap
:param str namespace: object name and auth scope, such as for teams and projects
:param str template_key: key for find V1Pod params in ConfigMap
:param bool exit_on_fail: If True then function raise exception of fail. If False function returns empty V1Pod
"""
try:
config.load_incluster_config()
template = CoreV1Api().read_namespaced_config_map(name=cm_name, namespace=namespace)
pod_spec_raw = template.data[template_key]
pod_spec = yaml.safe_load(pod_spec_raw)
pod = client.ApiClient()._ApiClient__deserialize(pod_spec, V1Pod)
except (Exception, yaml.YAMLError) as e:
if exit_on_fail:
raise e
else:
print(f"Error during incluster config load and reading cm with api: {e}")
pod = V1Pod()
return pod
Затем просто подставляем в параметр KubernetesPodOperator распакованный словарь:
full_pod_spec=get_pod_template(**config.get('pod_template_params', {}))
И помним, что дебажить удобнее с dry_run().
Инфраструктура, в которой работает фреймворк
Скорость работы DAG`ов
Препятствие: наши Airflow расположены в Kubernetes, причем не только production, но и остальные, включая dev и test. Быстрым вариантом запуска казался KubernetesExecutor — он уже был настроен. Но этот тип экзекьютора поднимает Pod на каждую задачу. Для ETL-задач в виде тысяч SQL- или dbt-операторов — неэффективно.
Решение: мы перешли на гибридный KubernetesCeleryExecutor. Для ETL-задач указываем очередь Celery, а для обособленных задач — Kubernetes.
Такое деление значительно ускорило ETL DAG`и.
Утилизация ресурсов кластера
CeleryExecutor эффективен, только если воркеры персистентны и готовы принять задачу из очереди в работу. Следовательно, когда задач нет, ресурсы простаивают.
Для масштабирования Celery-воркеров может подойти KEDA. В отличие от HPA, который смотрит только на потребление вычислительных ресурсов, KEDA умеет ориентироваться на прикладные метрики — например, количество задач в очереди.
VPA теоретически мог бы помочь для задач KubernetesExecutor, но есть тонкости с перезапуском пода.

Когда Scheduler распарсит и сериализует DAG в указанную очередь, он либо отправляет задачи на выполнение в брокер Redis (для Celery), либо обращается по Kubernetes API для создания dedicated-пода на выполнение задачи (для Kubernetes).
Обновление библиотек
Для задач KubernetesExecutor библиотеки можно установить в момент поднятия пода запуском пакетного менеджера. Но для ETL-задач, которые стартуют на Celery, обновление библиотек означает редеплой пода и прерывание текущих задач.
Версии некоторых библиотек (в основном внутрибанковских) могут меняться ежедневно. Поэтому мы добавили технические DAG`и без расписания: они запускаются по событию обновления библиотек через REST API и устанавливают venv с актуальными версиями на персистентных разделах.
Заключение
Благодаря dag_generator мы создали стандарт для постановки типовых бизнес-задач на регламентное выполнение без написания кода. Что мы получили:
гибкость — любая команда может написать свой джоб для решения конкретной задачи;
скорость — low-code-подход и валидация ускорили TTM;
поддержку — динамическая генерация DAG и разделение артефактов упростили сопровождение;
расширяемость — никто не запрещает вручную дописывать код в сгенерированные DAG, что оставляет свободу в сложных случаях.
Наш опыт показал, что даже среди зрелых open-source-инструментов есть место для своего «мультитула», который затачивается под процессы конкретной команды.