Привет, Хабр! Меня зовут Михаил Онянов, я Python-разработчик и платформенный инженер в крупнейшем проекте компании Magnit Tech – F&R.
Я создаю инфраструктуру для data-инженеров: разрабатываю интеграции с системами-источниками данных, внедряю сервисы для Data Observability и занимаюсь поддержкой качества кодовой базы на Python.

Из статьи вы узнаете, как с помощью механизма Cluster Policies в Apache Airflow вынести требования к DAG’ам в исполняемый код:
Поговорим о том, когда и зачем нужен отдельный слой Policies.
Посмотрим на примеры требований в больших data-инженерных проектах и способ их реализации с помощью политик.
Покажу нашу архитектуру, примеры кода и способы внедрения.
Сделаем выводы из моих ошибок, допущенных при разработке и внедрении.
В конце посмотрим, в каких ещё системах используется аналогичный механизм.
В тексте я опираюсь на наш опыт, описанный здесь, и примеры решений из проекта F&R; использовал множество источников, обсуждений и примеров из документации OSS инструмента Apache Airflow, спасибо его сообществу, во многом благодаря их вопросам и обсуждениям возникла эта статья, также много полезного взял из цикла видео с Airflow Summit.

Как мы дошли до политики такой...
В больших кластерах Apache Airflow со временем неизбежно появляется хаос: сотни DAG’ов, множество SLA и требований к метаданным, несколько команд с разными подходами к разработке. Для оркестрации загрузки и преобразования данных каждый день запускается более 200 DAG’ов, а для работы системы требуется около 1,5 ПБ данных из разных хранилищ.
Ключевая проблема таких проектов — соблюдение договорённостей и поддержка контекста. Есть прекрасная цитата Фредерика Брукса на эту тему: «Десять человек без процесса — это десять источников случайных ошибок. Два человека с процессом и общим контекстом — это работающий механизм».
Наша цель — избавиться от нужды постоянно синхронизироваться по спецификации и избежать задержек и рассинхрона при изменениях требований, делегировав проверки системе.
Что такое Cluster Policies в Airflow
Cluster Policies (политики) — это механизм платформы Apache Airflow, который позволяет централизованно проверять и изменять объекты системы до запуска конвейера или в runtime. Политики превращают правила и договорённости в исполняемый код, который находится между ядром Airflow и вашим кодом DAG’ов.
Этот слой кастомизации Apache Airflow внедряют разработчики с доступом к развёртыванию кластера или администраторы платформы. Если инженер запушит некорректный код DAG, то в интерфейсе увидит сообщения:

Для работы с Airflow мы используем Kubernetes, а экземпляр Airflow является коммунальным для нескольких команд разработчиков. При написании DAG’ов каждая команда использует свои договорённости. Команде DE важно, чтобы название конвейера соответствовало определённому шаблону.
Например, dag_id=fnr_etl_raw_finance_v_price_indicator_day содержит указание на область данных, вид сущности и тип объекта. В команде ML-разработчиков могут быть свои требования для создания DAG’ов, на которые мы не хотим влиять.
Какие задачи мы решили с помощью политик Airflow:
Гарантируем заполнение метаданных для DAG, например, owner, description, doc_md.
Проверяем форматы данных полей описания и метаданных для интеграции с сервисом OpenMetadata.
Настраиваем коллбэки по умолчанию для каждого конвейера — разработчик больше не думает о том, будет ли оповещение, если его DAG упадёт.
Реализуем слой политик — от минимальных обработчиков к полноценному platform governance
Для этого нам понадобится развёрнутый Airlfow 2.6+. Для локального запуска можно повторить действия из этой статьи. Самый быстрый способ внедрения — модификация файла airflow_local_settings.py, который лежит по пути $AIRFLOW_HOME/config или в sys.path.
Аналогично получается и для Kubernetes-сценария, в котором вы поставляете свой образ или монтируете папку config в scheduler, dag processor, webserver и workers.
Примечание: с Airflow 2.10.1 папка $AIRFLOW_HOME/dags больше не попадает в sys.path на этапе инициализации, поэтому следует размещать файл airflow_local_settings.py в config, описание в документации.
Способы настройки политик в Apache Airflow
Есть два способа внедрения:
С помощью конфигурации
airflow_local_settings.py. В файле реализуем по шаблону имён функции, каждая из которых будет выполняться стандартным потоком Python. Недостатком подхода является сложность обновления и поддержки при работе с несколькими инстансами Airflow, а также возможная перезапись другими конфигурациями (описано в главе про частые проблемы).С помощью setuptools с Pluggy. Упаковываем функцию в пакет и поставляем как зависимость в Airflow. Отлично подойдёт, если планируется поддержка нескольких инстансов и вы хотите иметь единый источник правды для политик.
Для актуальной информации советую дополнительно ознакомиться с документацией. В Apache Airflow есть пять поддерживаемых Policy-функций:
Имя функции |
Когда применяется |
Что умеет |
|---|---|---|
dag_policy |
После загрузки DAG в DagBag. |
Мутация объекта DAG, пропуск или блокировка. |
task_policy |
После добавления задачи в DAG. |
Проверка и изменение свойств заданий. |
task_instance_mutation_hook |
Перед выполнением задания. |
Динамические проверки в runtime. |
pod_mutation_hook |
Перед созданием Pod. |
Меняет спецификацию k8s pod. |
get_airflow_context_vars |
Перед выполнением задания. |
Внедрение переменных ключ-значение в контекст задания. |
Примечание: последние две функции являются хуками с более широким применением, чем реализация слоя governance. Примеры их применения можно посмотреть в презентации с Airflow Summit 2025.
Чтобы код политики начал применяться в каждом конвейере, функции политик должны соответствовать одному из перечисленных наименований:
def task_policy(task): ... def dag_policy(dag): ... def task_instance_mutation_hook(task_instance): ... def pod_mutation_hook(pod): ... def get_airflow_context_vars(context): ...
Далее рассмотрим примеры реализации политик.
Dag policy
Для реализации нашей первой политики опишем функцию, которая не даст выложить DAG, если у него нет нужных нам атрибутов. Мы хотим видеть, когда у DAG не указан владелец, нет документации, или у конвейера некорректно заданы метаданные в поле tags, которые мы потом отправим в OpenMetadata:
from airflow.exceptions import AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag from airflow.conf import conf # Для совместимости Airflow 2 и 3 версии. try: from airflow.sdk import DAG from airflow.sdk.bases.operator import BaseOperator except ImportError: from airflow.models import DAG, BaseOperator REQUIRED_TAGS = {"author", "pipeline_class", "source_guid", "stream"} def dag_policy(dag: DAG) -> None: """Проверяет DAG на соответствие политикам кластера.""" default_owner = conf.get("operators", "default_owner") if not dag.owner or dag.owner.lower() == default_owner.lower(): raise AirflowClusterPolicyViolation( f"DAG '{dag.dag_id}' owner не задан или равен default: '{dag.owner}'" ) if not getattr(dag, "doc_md", None): raise AirflowClusterPolicyViolation( f"DAG '{dag.dag_id}' должен иметь заполненный 'doc_md' с markdown-описанием." ) if not dag.tags: raise AirflowClusterPolicyViolation( f"DAG '{dag.dag_id}' не имеет тегов. Хотя бы один тег обязателен. Путь: {dag.fileloc}" ) dag_tags = {tag.strip().lower() for tag in dag.tags} missing_tags = REQUIRED_TAGS - dag_tags if missing_tags: raise AirflowClusterPolicyViolation( f"DAG '{dag.dag_id}' отсутствуют обязательные теги: {', '.join(sorted(missing_tags))}" ) if "only_for_beta" in dag_tags: raise AirflowClusterPolicySkipDag( f"DAG '{dag.dag_id}' пропущен на проде (имеет тег 'only_for_beta')." )
Если разработчик опубликует DAG с кодом, который не содержит тег owner или заполнит его значением airflow, то в интерфейсе появится ошибка:

Из интересного: AirflowClusterPolicySkipDag также не даст опубликовать DAG, но тихо, без вывода ошибки в интерфейсе. Когда это может быть полезно — предлагаю написать в комментарии читателю.
Примечание про различия импорта во второй и третьей версии Airflow: если вы мигрируете с Airflow 2 на Airflow 3, то для аннотации типов вам необходимо импортировать DAG и BaseOperator из модуля sdk, который заменяет модуль models в третьей версии. При миграции может пригодиться ruff: он способен автоматически поправить ваши импорты для миграции на новую версию Airflow.
Эта политика выполняется после загрузки DAG в DagBag, однако её нарушение не допустит конвейер к выполнению в системе и не покажет его в UI. Кроме того, DagPolicy позволяет мутировать DAG, однако это не всегда так, о чём мы поговорим дальше.
Task policy
Этот вид политик позволяет как ограничить выполнение операторов, наследуемых от BaseOperator, так и изменить параметры оператора или даже заменить исполняемый оператор. Реализуем правило, чтобы в каждом конвейере были коллбэки по умолчанию:
from airflow.exceptions import AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag from airflow.conf import conf # Для совместимости Airflow 2 и 3 версии. try: from airflow.sdk.bases.operator import BaseOperator except ImportError: from airflow.models import BaseOperator DEFAULT_ON_FAILURE_CALLBACK = print def task_policy(task: BaseOperator) -> None: """Проверяет и устанавливает on_failure_callback для тасков.""" callback = getattr(task, "on_failure_callback", None) if callback is None: task.on_failure_callback = DEFAULT_ON_FAILURE_CALLBACK return if callable(callback): # Если уже установлен свой коллбэк — добавляем default в список task.on_failure_callback = [callback, DEFAULT_ON_FAILURE_CALLBACK] return if isinstance(callback, list): if DEFAULT_ON_FAILURE_CALLBACK not in callback: callback.append(DEFAULT_ON_FAILURE_CALLBACK) return Кроме того, task_policy может пригодиться, если вы из Airflow запускаете поды K8s. Пример из доклада: если получили задачу Spark, то заставляем Airflow использовать заданный нами Docker-образ. Это гарантирует, что у Spark-задачи будут нужные зависимости и окружение: from airflow.sdk.bases.operator import BaseOperator from airflow.providers.apache.spark.operators.spark_submit import ( SparkSubmitOperator, ) def task_policy(task: BaseOperator) -> None: if isinstance(task, SparkSubmitOperator): executor_config = { "pod_override": k8s.V1Pod( spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", image="airflow-with-spark" ), ] ) ) } task.executor_config = executor_config task.doc = "Warning! This task has been mutated by your friendly Airflow admin!"
Из примеров видно, что dag_policy и task_policy позволяют реализовать большой пласт логики поверх стандартного Airflow. Кроме работы с объектами и операторами, политики могут помочь в защите от нежелательного кода.
Пример: опасные операторы
Все мы когда-то защищали свои приложения от атак через инъекцию кода. Airflow предоставляет интерфейс для параметров DAG, позволяющий через UI ввести параметры, которые передаются в операторы в DAG. Но как защититься от вредоносного кода? Можно в каждый DAG импортировать функцию типа sanitize(input_cmd), а можно создать политику. Рассмотрим пример:

Допустим, мы встретили Bash-оператор с такой командой или получили её из интерфейса:
definitely_not_a_dangerous_task = BashOperator( task_id="delete_all_fs", bash_command="rm -rf / --no-preserve-root", # <--- Опасная команда! )
От таких ситуаций можно уберечь себя с помощью TaskPolicy, которая будет либо запрещать определённые операторы, либо проверять и модифицировать их аргументы:
from airflow.models import BaseOperator from airflow.exceptions import AirflowClusterPolicyViolation import re # Список опасных команд/шаблонов DANGEROUS_COMMANDS = [ r"rm\s+-rf\s+/", # полное удаление root r"mkfs", # форматирование диска r":\s*>\s*/", # обнуление файлов в / r"shutdown", # выключение системы r"reboot" # перезагрузка ] def task_policy(task: BaseOperator): if task.task_type == "BashOperator": bash_cmd = getattr(task, "bash_command", "") # Проверка на опасные команды for pattern in DANGEROUS_COMMANDS: if re.search(pattern, bash_cmd): raise AirflowClusterPolicyViolation( f"Task {task.dag_id}.{task.task_id} содержит опасную команду: {bash_cmd}" ) return task
Ключевое отличие TaskPolicy от DagPolicy
TaskPolicy отрабатывают после загрузки DAG, поэтому хук позволяет изменять значения параметров операторов и default_args, что не позволяет DagPolicy. Документация прямо указывает на то, что атрибуты, выставленные в task_policy, имеют приоритет над тем, что было задано в DAG-файле или в dag_policy, поэтому изменение состояний через DagPolicy не гарантируется.
task_instance_mutation_hook: runtime-проверки
В отличие от task_policy, этот хук срабатывает уже перед выполнением задачи. Это позволяет учитывать контекст запуска и модифицировать задачу на лету в runtime:
from airflow.models import TaskInstance from datetime import datetime def task_instance_mutation_hook(task_instance: TaskInstance) -> None: """Применяем политики перед каждым запуском.""" dag = task_instance.dag_model task = task_instance.task # Блокируем запуск в выходные для критичных DAG'ов if getattr(task, 'no_weekend_runs', False): execution_date = task_instance.execution_date if execution_date.weekday() >= 5: # сейчас суббота или воскресенье task_instance.skip() return # Ограничиваем параллелизм в non-prod окружениях env = os.getenv('AIRFLOW_ENV', 'prod') if env != 'prod': if hasattr(task, 'pool') and task.pool: task.pool = f"limit-{task.pool}"
pod_mutation_hook: кастомизация Kubernetes
Когда Airflow запускает задачу через KubernetesPodOperator, перед созданием pod’а можно модифицировать его спецификацию с помощью pod_mutation_hook:
from kubernetes.client import V1Pod def pod_mutation_hook(pod: V1Pod, k8s_task_instance) -> None: """Модифицируем Pod перед запуском.""" # Добавляем обязательные аннотации pod.metadata.annotations = pod.metadata.annotations or {} pod.metadata.annotations['monitoring/spark'] = 'true' pod.metadata.annotations['cost-center'] = 'data-platform' # Подключаем shared volume для общих библиотек from kubernetes.client import V1Volume, V1VolumeMount volume = V1Volume(name='shared-libs') volume.config_map = {'name': 'common-libraries'} if not pod.spec.volumes: pod.spec.volumes = [] pod.spec.volumes.append(volume) # Добавляем volume mount ко всем контейнерам for container in pod.spec.containers: if not container.volume_mounts: container.volume_mounts = [] container.volume_mounts.append( V1VolumeMount(mount_path='/opt/shared', name='shared-libs') ) # Проставляем лимиты ресурсов for container in pod.spec.containers: if not container.resources: container.resources = V1ResourceRequirements() if not container.resources.limits: container.resources.limits = {} container.resources.limits['cpu'] = '1' container.resources.limits['memory'] = '2Gi'
Дежавю? Вам не показалось!
Похожий пример мы уже рассмотрели с task_policy(task: BaseOperator), однако они отличаются в этапах выполнения функции: task_policy изменяет сам оператор в коде DAG, а pod_mutation_hook изменяет конкретный Pod прямо перед его запуском.
get_airflow_context_vars
Этот хук позволяет автоматически передавать переменные из контекста Airflow в переменные окружения операторов. Например, если требуется передать идентификатор запуска задачи в bash-скрипт, то в стандартном подходе нужно вручную прописать Jinja-шаблон в каждый таск:
# Без политики придётся дублировать этот код во всех DAG'ах fetch_data = BashOperator( task_id='fetch_data', bash_command='python my_script.py', env={ 'AIRFLOW_TRACKING_ID': "{{ run_id }}", # <--- Во время запуска сюда подставятся переменные из runtime 'CURRENT_DAG': "{{ dag.dag_id }}", 'CURRENT_TASK': "{{ task.task_id }}" } )
Для упрощения таких задач создана политика get_airflow_context_vars, она автоматически делает инъекцию параметров в контекст задачи:
def get_airflow_context_vars(context: dict) -> dict[str, str]: """Автоматически экспортируем метаданные Airflow в OS environment переменных.""" # Собираем базовый контекст для логирования и трейсинга, который будет поставлен как ENV в операторы context_vars = { 'APP_METRIC_TAG': f"airflow-{context['dag'].dag_id}", 'AIRFLOW_CONTEXT_DAG_ID': context['dag'].dag_id, 'AIRFLOW_CONTEXT_TASK_ID': context['task'].task_id, 'AIRFLOW_CONTEXT_RUN_ID': context['run_id'], } # Добавляем логическую дату в удобном формате if 'logical_date' in context: context_vars['AIRFLOW_CONTEXT_DATE'] = context['logical_date'].strftime('%Y-%m-%d') return context_vars
В результате в bash-скрипте мы сможем использовать наши переменные напрямую из среды, без передачи их конструктору BashOperator:
#!/bin/bash # Скрипт сам знает, в рамках какого запуска Airflow он работает echo "Запуск под тегом: $APP_METRIC_TAG" echo "Идентификатор рана: $AIRFLOW_CONTEXT_RUN_ID" # Отправка логов в OpenTelemetry или Sentry с привязкой к Airflow curl -X POST https://monitoring.internal \ -d "msg=Success" \ -H "X-Airflow-Run: $AIRFLOW_CONTEXT_RUN_ID"
На этом научпоп про Cluster Policies в Apache Airflow заканчивается! Для желающих гибко управлять политиками в своём кластере приглашаю в следующую главу — в ней рассмотрим улучшения стандартного подхода, которые используются у нас в prod’e.

Наша архитектура слоя политик
Официальная документация рекомендует ввести абстракцию вокруг правил, так как политик для кластера понадобится n-штук.
Примеры кода из этой статьи и важные материалы вы сможете найти в репозитории, чтобы быстро перейти к внедрению политик у себя. Разработчик пишет только классы правил, которые передаются в главную функцию по id через словарь в airflow_local_settings.py:
"""Включенные правила, которые будут применяться при проверке DAG и Task.""" ENABLED_RULES_NAMES = { 'set_default_on_failure_callbacks', 'checks_dag_failure_notifications', 'dag_requires_doc_md', 'dag_requires_description', 'dag_requires_non_default_owner', }
Также словарь ENABLED_RULES_NAMES можно передать в airflow_local_settings.py с помощью механизма Variables, а саму переменную можно изменить через интерфейс Airflow.
Для описания правила сделали абстракцию с помощью класса Rule. Он задаёт единый контракт для всех политик, у правила есть: идентификатор, описание для исключения, метод проверки и общий сценарий выполнения.
При запуске системы Airflow читает файл airflow_cluster_policies.py, в котором инициализируются политики. Выглядит это так:

Наследники Rule реализуют конкретное правило, а наш policy-engine берёт на себя единообразную обработку результата, логирование и реакцию на нарушения политик. Алгоритм обработки правил можно изучить на диаграмме ниже:

Для каждого типа политик создаются правила, которые живут в одноимённых файлах:
./ ├── airflow_policy_engine_layer/ │ ├── policies/ <--- Пакет с описаниями политик всех видов │ │ ├── init.py │ │ ├── context_rules.py │ │ ├── dag_rules.py │ │ ├── pod_rules.py │ │ ├── task_instance_rules.py │ │ └── task_rules.py │ ├── init.py │ ├── base_rule.py <--- Базовый класс Rule, который │ ├── builtin_policy_specs.py имплементируется модулями в policies │ ├── logic.py │ ├── policy_loader.py │ ├── policy_registry.py │ └── policy_types.py ├── plugins/ │ ├── common/ │ │ ├── init.py │ │ └── callbacks_registry.py │ ├── hooks/ │ │ ├── keephq/ │ │ │ ├── init.py │ │ │ ├── keephq_hook.py │ │ │ └── utils.py │ │ └── init.py │ └── init.py ├── tests/ │ ├── init.py │ ├── conftest.py │ ├── test_logic.py │ └── test_task_rules.py ├── README.md ├── airflow_local_settings.py. <--- Точка входа и инициализация политик ├── ca-bundle.crt ├── check_dags_with_policy_engine.py ├── pyproject.toml └── uv.lock
Опыт эксплуатации и обновления политик в нескольких инстансах Airflow показал, что лучшим решением для поставки в большие проекты является создание пакетов и внедрение при помощи Pluggy, о котором я писал в начале главы. Для маленьких и средних проектов такой способ, скорее всего, избыточен, так как поставка пакетов в Airflow также является нетривиальной задачей.
Наши интересные политики
В этой главе предлагаю ознакомиться с самыми востребованными политиками в нашем проекте. В конце поделюсь проблемами, с которыми я столкнулся при реализации и внедрении.
Централизованный алертинг
Когда я пришёл в проект, каждый разработчик отвечал за корректное заполнение метаданных в поле tags и добавление модуля оповещений в DAG. Для этого нужно было знать:
Название модуля для алертинга и корректный импорт;
Имя Variable, которая содержит словарь с кредами для оповещения, так как учётных записей для оповещений может быть несколько;
Имя Variable для
room_id, в которую отправлять оповещение;Корректный способ добавления оповещения через
default_args. Если добавить коллбек в параметры DAG, то он не прокинется в каждое задание.
Также я обратил внимание на периодические правки на 200+ конвейеров. Такое происходило, например, при смене модуля оповещений:

Примечание: опытный разработчик, конечно, заметит, что для оповещений можно было реализовать абстракцию, которая бы избавила от массовых правок при смене модуля оповещений.
Как не допустить такого с помощью политик? Первой нашей политикой была установка коллбэков по умолчанию с помощью task_policy. Для этого необходимо существующие on_failure_callbacks сравнить с обязательными из переменной default_callbacks, и вставить вызовы недостающих. Принцип работы показан на схеме ниже, а нашу реализацию можно посмотреть в репозитории:

Разработка политик усложняется разнообразием типов объектов и специальными случаями обработки, как, например, обработка объектов из partial и expand в Airflow.
Проверка метаданных
Для интеграции с сервисом OpenMetadata мы используем поле tags, в котором каждый разработчик заполняет данные формата ключ:значение. Также есть дополнительные требования к самому формату описания метаданных:

До внедрения политик изменения требований сопровождались массовыми правками, а ответственность была на инженере данных, которому меньше всего хочется заходить на корпоративный портал и искать, какие значения поддерживаются полем pool и нужно ли вставлять атрибут sla:

Проверка метаданных существенно проще работы с операторами в task_policy. Для реализации нескольких проверок паттерном является накопление ошибок, чтобы в интерфейсе Airflow были отображены ошибки по каждому виду политик:

Я решил не описывать реализованные нами политики требований к заполнению полей DAG DagRequiresDescription, DagRequiresNonDefaultOwner, DagRequiresDocMd и DagHasFailureNotification — их код представлен и подробно задокументирован в репозитории, который является прикладной частью статьи. Я буду рад PR для демонстрации всех возможностей политик, а также примеров внедрения через интерфейс Pluggy и продвинутой работы с task_instance_mutation_hook и pod_mutation_hook.
Польза для техподдержки
Обязательное тэгирование с валидацией данных приносит отдельную пользу: команда сопровождения и эксплуатации может сразу понять, какой сервис или домен затронут, кто отвечает за DAG и насколько критично его падение. Airflow поддерживает отображение и фильтрацию DAG по tags в интерфейсе:

Также мы отправляем эти данные в OpenMetadata, что оказывается полезно при ежедневной поддержке платформы:

Какие я встретил подводные камни
Поговорим о том, что стоит иметь в виду при разработке и внедрении policies в Airflow. Я не нашёл в одном месте описание перечисленных здесь ошибок, поэтому, возможно, это поможет именно вам:
Как убедиться, что применились изменения в коде политик? При разработке политик мы работаем с папкой, смонтированной в контейнер, но её код, в отличие от кода DAG, не парсится по расписанию. Чтобы увидеть изменения, нужен restart/redeploy процессов worker и scheduler Airflow, которые импортируют airflow_local_settings.py.
Политики Airflow не работают, хотя файл
airflow_local_settings.pyсмонтирован и функции политик реализованы. Возможно, вы используете для развёртывания Airflow k8s и на уровне YAML-файла описали конфигурацию в airflowLocalSettings. Если этот параметр не выставлен вnull, то файл Helm chart может смонтировать собственныйairflow_local_settings.py, который перекроет файл из image/container filesystem.Какой оверхед и насколько это безопасно? Airflow policies по своей архитектуре добавляют точку отказа для каждого DAG, поэтому необходимо тщательно проверять код. Не лучшей идеей будет вставить в код политик I/O-операции, обращение к БД и сетевые запросы, дорогие импорты — это может замедлить работу вашей системы.
Работа с
MappedOperatorиimmutability: в Airflow есть понятие MappedOperator, он нужен для генерирования произвольного количества Task в runtime в зависимости от работы предыдущих задач. Если вы хотите сделать политику, которая на падение каждого объекта типаBaseOperatorбудет вызывать оповещение, тоMappedOperatorведёт себя иначе, чем обычныйBaseOperator, а многие поля должны изменяться черезpartial_kwargs. Для работы с ним потребуется отдельно отбирать тип оператора (см. слайд 24).
Если в других сферах применения Apache Airflow, таких как BI, DevOps и MLOps у вас возникли другие проблемы или практики, то предлагаю поделиться ими в комментариях или в репозитории.
На что это ещё похоже?
Предлагаю напоследок немного размять свою инженерную культуру и посмотреть, как в других популярных системах применяется такой же подход: централизованная проверка, ограничение кода и конфигурация до попадания в production.
Kubernetes: ValidatingAdmissionPolicy, Kyverno
В Kubernetes похожую роль выполняют admission policies — правила, которые проверяют ресурсы ещё до их сохранения в API server. ValidatingAdmissionPolicy позволяет описывать CEL-правила как обычные Kubernetes API objects. Kyverno идёт дальше и умеет не только проверять, но и менять YAML-манифесты. Это хороший пример platform-level governance для инфраструктуры.
Политики GitLab
GitLab позволяет централизованно внедрять обязательные CI jobs через Pipeline Execution Policies. Политики могут внедряться или переписывать pipeline-конфигурацию проекта и запускать проверки для всех репозиториев группы. Это слой governance поверх пользовательского CI/CD. Я искал похожие реализации уже на этапе написания статьи, и как же здорово видеть реализацию тех же полей в политиках GitLab, что мы сделали для своего класса Rule (документация про Pipeline Execution Policies).
GitHub: rulesets + required status checks
В GitHub похожая модель строится через Rulesets, branch protection и required status checks. Обычно это комбинация правил в UI/API/Terraform и CI-проверок через GitHub Actions. Такой подход позволяет централизованно вызывать заданные правила ещё до merge в main (ещё скажите, что есть другие ветки):
Что в итоге?
Мы исследовали маленькую, но интересную часть системы Apache Airflow — политики. Проделали путь от простых проверок на отсутствующие поля DAG до создания полноценного движка, который позволяет централизованно внедрять обработчики, проверять метаданные конвейеров и в удобном формате показывать пользователю, где и как нужно поправить ошибку.
Делитесь в комментариях своими примерами, вносите свой вклад в OpenSource и покупайте сырки в Магните. Если зашла статья, то заглядывайте в мой блог: в нём я выстраиваю TIL, пишу мысли про карьеру, образование и всё, что помогает совмещать разные сферы жизни чуть менее хаотично.
Ciao!