Привет, Хабр! Меня зовут Михаил Онянов, я Python-разработчик и платформенный инженер в крупнейшем проекте компании Magnit Tech – F&R.

Я создаю инфраструктуру для data-инженеров: разрабатываю интеграции с системами-источниками данных, внедряю сервисы для Data Observability и занимаюсь поддержкой качества кодовой базы на Python.

Pasted image 20260526165121.png

Из статьи вы узнаете, как с помощью механизма Cluster Policies в Apache Airflow вынести требования к DAG’ам в исполняемый код:

  1. Поговорим о том, когда и зачем нужен отдельный слой Policies.

  2. Посмотрим на примеры требований в больших data-инженерных проектах и способ их реализации с помощью политик.

  3. Покажу нашу архитектуру, примеры кода и способы внедрения.

  4. Сделаем выводы из моих ошибок, допущенных при разработке и внедрении.

  5. В конце посмотрим, в каких ещё системах используется аналогичный механизм.

В тексте я опираюсь на наш опыт, описанный здесь, и примеры решений из проекта F&R; использовал множество источников, обсуждений и примеров из документации OSS инструмента Apache Airflow, спасибо его сообществу, во многом благодаря их вопросам и обсуждениям возникла эта статья, также много полезного взял из цикла видео с Airflow Summit.

AirflowLogo.svg.png

Как мы дошли до политики такой...

В больших кластерах Apache Airflow со временем неизбежно появляется хаос: сотни DAG’ов, множество SLA и требований к метаданным, несколько команд с разными подходами к разработке. Для оркестрации загрузки и преобразования данных каждый день запускается более 200 DAG’ов, а для работы системы требуется около 1,5 ПБ данных из разных хранилищ.

Ключевая проблема таких проектов — соблюдение договорённостей и поддержка контекста. Есть прекрасная цитата Фредерика Брукса на эту тему: «Десять человек без процесса — это десять источников случайных ошибок. Два человека с процессом и общим контекстом — это работающий механизм».

Наша цель — избавиться от нужды постоянно синхронизироваться по спецификации и избежать задержек и рассинхрона при изменениях требований, делегировав проверки системе.

Что такое Cluster Policies в Airflow

Cluster Policies (политики) — это механизм платформы Apache Airflow, который позволяет централизованно проверять и изменять объекты системы до запуска конвейера или в runtime. Политики превращают правила и договорённости в исполняемый код, который находится между ядром Airflow и вашим кодом DAG’ов.

Этот слой кастомизации Apache Airflow внедряют разработчики с доступом к развёртыванию кластера или администраторы платформы. Если инженер запушит некорректный код DAG, то в интерфейсе увидит сообщения:

Pasted image 20260525175634.png

Для работы с Airflow мы используем Kubernetes, а экземпляр Airflow является коммунальным для нескольких команд разработчиков. При написании DAG’ов каждая команда использует свои договорённости. Команде DE важно, чтобы название конвейера соответствовало определённому шаблону.
Например, dag_id=fnr_etl_raw_finance_v_price_indicator_day содержит указание на область данных, вид сущности и тип объекта. В команде ML-разработчиков могут быть свои требования для создания DAG’ов, на которые мы не хотим влиять.
Какие задачи мы решили с помощью политик Airflow: 

  1. Гарантируем заполнение метаданных для DAG, например, owner, description, doc_md.

  2. Проверяем форматы данных полей описания и метаданных для интеграции с сервисом OpenMetadata.

  3. Настраиваем коллбэки по умолчанию для каждого конвейера — разработчик больше не думает о том, будет ли оповещение, если его DAG упадёт.

Реализуем слой политик — от минимальных обработчиков к полноценному platform governance

Для этого нам понадобится развёрнутый Airlfow 2.6+. Для локального запуска можно повторить действия из этой статьи. Самый быстрый способ внедрения — модификация файла airflow_local_settings.py, который лежит по пути $AIRFLOW_HOME/config или в sys.path.

Аналогично получается и для Kubernetes-сценария, в котором вы поставляете свой образ или монтируете папку config в scheduler, dag processor, webserver и workers.

Примечание: с Airflow 2.10.1 папка $AIRFLOW_HOME/dags больше не попадает в sys.path на этапе инициализации, поэтому следует размещать файл airflow_local_settings.py в config, описание в документации.

Способы настройки политик в Apache Airflow

Есть два способа внедрения:

  1. С помощью конфигурации airflow_local_settings.py. В файле реализуем по шаблону имён функции, каждая из которых будет выполняться стандартным потоком Python. Недостатком подхода является сложность обновления и поддержки при работе с несколькими инстансами Airflow, а также возможная перезапись другими конфигурациями (описано в главе про частые проблемы).

  2. С помощью setuptools с Pluggy. Упаковываем функцию в пакет и поставляем как зависимость в Airflow. Отлично подойдёт, если планируется поддержка нескольких инстансов и вы хотите иметь единый источник правды для политик.

Для актуальной информации советую дополнительно ознакомиться с документацией. В Apache Airflow есть пять поддерживаемых Policy-функций:

Имя функции

Когда применяется

Что умеет

dag_policy

После загрузки DAG в DagBag.

Мутация объекта DAG, пропуск или блокировка.

task_policy

После добавления задачи в DAG.

Проверка и изменение свойств заданий.

task_instance_mutation_hook

Перед выполнением задания.

Динамические проверки в runtime.

pod_mutation_hook

Перед созданием Pod.

Меняет спецификацию k8s pod.

get_airflow_context_vars

Перед выполнением задания.

Внедрение переменных ключ-значение в контекст задания.

Примечание: последние две функции являются хуками с более широким применением, чем реализация слоя governance. Примеры их применения можно посмотреть в презентации с Airflow Summit 2025.

Чтобы код политики начал применяться в каждом конвейере, функции политик должны соответствовать одному из перечисленных наименований:

def task_policy(task):
    ...
   
def dag_policy(dag):
    ...

def task_instance_mutation_hook(task_instance):
    ...
   
def pod_mutation_hook(pod):
    ...
   
def get_airflow_context_vars(context):
    ...

Далее рассмотрим примеры реализации политик.

Dag policy

Для реализации нашей первой политики опишем функцию, которая не даст выложить DAG, если у него нет нужных нам атрибутов. Мы хотим видеть, когда у DAG не указан владелец, нет документации, или у конвейера некорректно заданы метаданные в поле tags, которые мы потом отправим в OpenMetadata:

from airflow.exceptions import AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag
from airflow.conf import conf

# Для совместимости Airflow 2 и 3 версии.
try:
    from airflow.sdk import DAG
    from airflow.sdk.bases.operator import BaseOperator
except ImportError:
    from airflow.models import DAG, BaseOperator


REQUIRED_TAGS = {"author", "pipeline_class", "source_guid", "stream"}


def dag_policy(dag: DAG) -> None:
    """Проверяет DAG на соответствие политикам кластера."""
    default_owner = conf.get("operators", "default_owner")
    if not dag.owner or dag.owner.lower() == default_owner.lower():
        raise AirflowClusterPolicyViolation(
            f"DAG '{dag.dag_id}' owner не задан или равен default: '{dag.owner}'"
        )

    if not getattr(dag, "doc_md", None):
        raise AirflowClusterPolicyViolation(
            f"DAG '{dag.dag_id}' должен иметь заполненный 'doc_md' с markdown-описанием."
        )

    if not dag.tags:
        raise AirflowClusterPolicyViolation(
            f"DAG '{dag.dag_id}' не имеет тегов. Хотя бы один тег обязателен. Путь: {dag.fileloc}"
        )

    dag_tags = {tag.strip().lower() for tag in dag.tags}
    missing_tags = REQUIRED_TAGS - dag_tags
    if missing_tags:
        raise AirflowClusterPolicyViolation(
            f"DAG '{dag.dag_id}' отсутствуют обязательные теги: {', '.join(sorted(missing_tags))}"
        )

    if "only_for_beta" in dag_tags:
        raise AirflowClusterPolicySkipDag(
            f"DAG '{dag.dag_id}' пропущен на проде (имеет тег 'only_for_beta')."
        )

Если разработчик опубликует DAG с кодом, который не содержит тег owner или заполнит его значением airflow, то в интерфейсе появится ошибка:

Pasted image 20260526160745.png

Из интересного: AirflowClusterPolicySkipDag также не даст опубликовать DAG, но тихо, без вывода ошибки в интерфейсе. Когда это может быть полезно — предлагаю написать в комментарии читателю.

Примечание про различия импорта во второй и третьей версии Airflow: если вы мигрируете с Airflow 2 на Airflow 3, то для аннотации типов вам необходимо импортировать DAG и BaseOperator из модуля sdk, который заменяет модуль models в третьей версии. При миграции может пригодиться ruff: он способен автоматически поправить ваши импорты для миграции на новую версию Airflow.

Эта политика выполняется после загрузки DAG в DagBag, однако её нарушение не допустит конвейер к выполнению в системе и не покажет его в UI. Кроме того, DagPolicy позволяет мутировать DAG, однако это не всегда так, о чём мы поговорим дальше.

Task policy

Этот вид политик позволяет как ограничить выполнение операторов, наследуемых от BaseOperator, так и изменить параметры оператора или даже заменить исполняемый оператор. Реализуем правило, чтобы в каждом конвейере были коллбэки по умолчанию:

from airflow.exceptions import AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag
from airflow.conf import conf

# Для совместимости Airflow 2 и 3 версии.
try:
    from airflow.sdk.bases.operator import BaseOperator
except ImportError:
    from airflow.models import BaseOperator


DEFAULT_ON_FAILURE_CALLBACK = print

def task_policy(task: BaseOperator) -> None:
"""Проверяет и устанавливает on_failure_callback для тасков."""

callback = getattr(task, "on_failure_callback", None)

if callback is None:
    task.on_failure_callback = DEFAULT_ON_FAILURE_CALLBACK
    return

if callable(callback):
    # Если уже установлен свой коллбэк — добавляем default в список
    task.on_failure_callback = [callback, DEFAULT_ON_FAILURE_CALLBACK]
    return

if isinstance(callback, list):
    if DEFAULT_ON_FAILURE_CALLBACK not in callback:
        callback.append(DEFAULT_ON_FAILURE_CALLBACK)
    return

Кроме того, task_policy может пригодиться, если вы из Airflow запускаете поды K8s. Пример из доклада: если получили задачу Spark, то заставляем Airflow использовать заданный нами Docker-образ. Это гарантирует, что у Spark-задачи будут нужные зависимости и окружение:

from airflow.sdk.bases.operator import BaseOperator
from airflow.providers.apache.spark.operators.spark_submit import (  
    SparkSubmitOperator,  
)  
  
def task_policy(task: BaseOperator) -> None:  
if isinstance(task, SparkSubmitOperator):  
    executor_config = {  
        "pod_override": k8s.V1Pod(  
            spec=k8s.V1PodSpec(  
                containers=[  
                    k8s.V1Container(
                        name="base", image="airflow-with-spark"
                    ),  
                ]  
            )  
        )  
    }  
task.executor_config = executor_config  
task.doc = "Warning! This task has been mutated by your friendly Airflow admin!"

Из примеров видно, что dag_policy и task_policy позволяют реализовать большой пласт логики поверх стандартного Airflow. Кроме работы с объектами и операторами, политики могут помочь в защите от нежелательного кода.

Пример: опасные операторы

Все мы когда-то защищали свои приложения от атак через инъекцию кода. Airflow предоставляет интерфейс для параметров DAG, позволяющий через UI ввести параметры, которые передаются в операторы в DAG. Но как защититься от вредоносного кода? Можно в каждый DAG импортировать функцию типа sanitize(input_cmd), а можно создать политику. Рассмотрим пример:

Pasted image 20260515104227.png

Допустим, мы встретили Bash-оператор с такой командой или получили её из интерфейса:

definitely_not_a_dangerous_task = BashOperator(
    task_id="delete_all_fs",  
    bash_command="rm -rf / --no-preserve-root", # <--- Опасная команда!
)

От таких ситуаций можно уберечь себя с помощью TaskPolicy, которая будет либо запрещать определённые операторы, либо проверять и модифицировать их аргументы:

from airflow.models import BaseOperator
from airflow.exceptions import AirflowClusterPolicyViolation
import re

# Список опасных команд/шаблонов
DANGEROUS_COMMANDS = [
    r"rm\s+-rf\s+/",        # полное удаление root
    r"mkfs",                # форматирование диска
    r":\s*>\s*/",           # обнуление файлов в /
    r"shutdown",            # выключение системы
    r"reboot"               # перезагрузка
]

def task_policy(task: BaseOperator):
    if task.task_type == "BashOperator":
        bash_cmd = getattr(task, "bash_command", "")

        # Проверка на опасные команды
        for pattern in DANGEROUS_COMMANDS:
            if re.search(pattern, bash_cmd):
                raise AirflowClusterPolicyViolation(
                    f"Task {task.dag_id}.{task.task_id} содержит опасную команду: {bash_cmd}"
                )
    return task

Ключевое отличие TaskPolicy от DagPolicy

TaskPolicy отрабатывают после загрузки DAG, поэтому хук позволяет изменять значения параметров операторов и default_args, что не позволяет DagPolicy. Документация прямо указывает на то, что атрибуты, выставленные в task_policy, имеют приоритет над тем, что было задано в DAG-файле или в dag_policy, поэтому изменение состояний через DagPolicy не гарантируется.

task_instance_mutation_hook: runtime-проверки

В отличие от task_policy, этот хук срабатывает уже перед выполнением задачи. Это позволяет учитывать контекст запуска и модифицировать задачу на лету в runtime:

from airflow.models import TaskInstance
from datetime import datetime

def task_instance_mutation_hook(task_instance: TaskInstance) -> None:
    """Применяем политики перед каждым запуском."""
    
    dag = task_instance.dag_model
    task = task_instance.task
    
    # Блокируем запуск в выходные для критичных DAG'ов
    if getattr(task, 'no_weekend_runs', False):
        execution_date = task_instance.execution_date
        if execution_date.weekday() >= 5:  # сейчас суббота или воскресенье
            task_instance.skip()
            return
    
    # Ограничиваем параллелизм в non-prod окружениях
    env = os.getenv('AIRFLOW_ENV', 'prod')
    if env != 'prod':
        if hasattr(task, 'pool') and task.pool:
            task.pool = f"limit-{task.pool}"

pod_mutation_hook: кастомизация Kubernetes

Когда Airflow запускает задачу через KubernetesPodOperator, перед созданием pod’а можно модифицировать его спецификацию с помощью pod_mutation_hook:

from kubernetes.client import V1Pod

def pod_mutation_hook(pod: V1Pod, k8s_task_instance) -> None:
    """Модифицируем Pod перед запуском."""
    
    # Добавляем обязательные аннотации
    pod.metadata.annotations = pod.metadata.annotations or {}
    pod.metadata.annotations['monitoring/spark'] = 'true'
    pod.metadata.annotations['cost-center'] = 'data-platform'
    
    # Подключаем shared volume для общих библиотек
    from kubernetes.client import V1Volume, V1VolumeMount
    
    volume = V1Volume(name='shared-libs')
    volume.config_map = {'name': 'common-libraries'}
    
    if not pod.spec.volumes:
        pod.spec.volumes = []
    pod.spec.volumes.append(volume)
    
    # Добавляем volume mount ко всем контейнерам
    for container in pod.spec.containers:
        if not container.volume_mounts:
            container.volume_mounts = []
        container.volume_mounts.append(
            V1VolumeMount(mount_path='/opt/shared', name='shared-libs')
        )
    
    # Проставляем лимиты ресурсов
    for container in pod.spec.containers:
        if not container.resources:
            container.resources = V1ResourceRequirements()
        if not container.resources.limits:
            container.resources.limits = {}
        container.resources.limits['cpu'] = '1'
        container.resources.limits['memory'] = '2Gi'

Дежавю? Вам не показалось!
Похожий пример мы уже рассмотрели с task_policy(task: BaseOperator), однако они отличаются в этапах выполнения функции: task_policy изменяет сам оператор в коде DAG, а pod_mutation_hook изменяет конкретный Pod прямо перед его запуском.

get_airflow_context_vars

Этот хук позволяет автоматически передавать переменные из контекста Airflow в переменные окружения операторов. Например, если требуется передать идентификатор запуска задачи в bash-скрипт, то в стандартном подходе нужно вручную прописать Jinja-шаблон в каждый таск:

# Без политики придётся дублировать этот код во всех DAG'ах
fetch_data = BashOperator(
    task_id='fetch_data',
    bash_command='python my_script.py',
    env={
        'AIRFLOW_TRACKING_ID': "{{ run_id }}",  # <--- Во время запуска сюда подставятся переменные из runtime
        'CURRENT_DAG': "{{ dag.dag_id }}",
        'CURRENT_TASK': "{{ task.task_id }}"
    }
)

Для упрощения таких задач создана политика get_airflow_context_vars, она автоматически делает инъекцию параметров в контекст задачи:

def get_airflow_context_vars(context: dict) -> dict[str, str]:
"""Автоматически экспортируем метаданные Airflow в OS environment переменных."""

# Собираем базовый контекст для логирования и трейсинга, который будет поставлен как ENV в операторы
context_vars = {
    'APP_METRIC_TAG': f"airflow-{context['dag'].dag_id}",
    'AIRFLOW_CONTEXT_DAG_ID': context['dag'].dag_id,
    'AIRFLOW_CONTEXT_TASK_ID': context['task'].task_id,
    'AIRFLOW_CONTEXT_RUN_ID': context['run_id'],
}

# Добавляем логическую дату в удобном формате
if 'logical_date' in context:
    context_vars['AIRFLOW_CONTEXT_DATE'] = context['logical_date'].strftime('%Y-%m-%d')

return context_vars

В результате в bash-скрипте мы сможем использовать наши переменные напрямую из среды, без передачи их конструктору BashOperator:

#!/bin/bash

# Скрипт сам знает, в рамках какого запуска Airflow он работает
echo "Запуск под тегом: $APP_METRIC_TAG"
echo "Идентификатор рана: $AIRFLOW_CONTEXT_RUN_ID"

# Отправка логов в OpenTelemetry или Sentry с привязкой к Airflow
curl -X POST https://monitoring.internal \
    -d "msg=Success" \
    -H "X-Airflow-Run: $AIRFLOW_CONTEXT_RUN_ID"

На этом научпоп про Cluster Policies в Apache Airflow заканчивается! Для желающих гибко управлять политиками в своём кластере приглашаю в следующую главу — в ней рассмотрим улучшения стандартного подхода, которые используются у нас в prod’e.

Pasted image 20260526160238.png
Везём в прод

Наша архитектура слоя политик

Официальная документация рекомендует ввести абстракцию вокруг правил, так как политик для кластера понадобится n-штук.

Примеры кода из этой статьи и важные материалы вы сможете найти в репозитории, чтобы быстро перейти к внедрению политик у себя. Разработчик пишет только классы правил, которые передаются в главную функцию по id через словарь в airflow_local_settings.py:

"""Включенные правила, которые будут применяться при проверке DAG и Task."""  
ENABLED_RULES_NAMES = {  
    'set_default_on_failure_callbacks',  
    'checks_dag_failure_notifications',  
    'dag_requires_doc_md',  
    'dag_requires_description',  
    'dag_requires_non_default_owner',  
}

Также словарь ENABLED_RULES_NAMES можно передать в airflow_local_settings.py с помощью механизма Variables, а саму переменную можно изменить через интерфейс Airflow.

Для описания правила сделали абстракцию с помощью класса Rule. Он задаёт единый контракт для всех политик, у правила есть: идентификатор, описание для исключения, метод проверки и общий сценарий выполнения.

При запуске системы Airflow читает файл airflow_cluster_policies.py, в котором инициализируются политики. Выглядит это так:

 

Pasted image 20260515163353.png

 Наследники Rule реализуют конкретное правило, а наш policy-engine берёт на себя единообразную обработку результата, логирование и реакцию на нарушения политик. Алгоритм обработки правил можно изучить на диаграмме ниже:

Pasted image 20260515153318.png

 Для каждого типа политик создаются правила, которые живут в одноимённых файлах:

./
├── airflow_policy_engine_layer/
│   ├── policies/                    <--- Пакет с описаниями политик всех видов
│   │   ├── init.py
│   │   ├── context_rules.py
│   │   ├── dag_rules.py
│   │   ├── pod_rules.py
│   │   ├── task_instance_rules.py
│   │   └── task_rules.py
│   ├── init.py
│   ├── base_rule.py                 <--- Базовый класс Rule, который 
│   ├── builtin_policy_specs.py          имплементируется модулями в policies
│   ├── logic.py
│   ├── policy_loader.py
│   ├── policy_registry.py
│   └── policy_types.py
├── plugins/
│   ├── common/
│   │   ├── init.py
│   │   └── callbacks_registry.py
│   ├── hooks/
│   │   ├── keephq/
│   │   │   ├── init.py
│   │   │   ├── keephq_hook.py
│   │   │   └── utils.py
│   │   └── init.py
│   └── init.py
├── tests/
│   ├── init.py
│   ├── conftest.py
│   ├── test_logic.py
│   └── test_task_rules.py
├── README.md
├── airflow_local_settings.py.     <--- Точка входа и инициализация политик
├── ca-bundle.crt
├── check_dags_with_policy_engine.py
├── pyproject.toml
└── uv.lock

Опыт эксплуатации и обновления политик в нескольких инстансах Airflow показал, что лучшим решением для поставки в большие проекты является создание пакетов и внедрение при помощи Pluggy, о котором я писал в начале главы. Для маленьких и средних проектов такой способ, скорее всего, избыточен, так как поставка пакетов в Airflow также является нетривиальной задачей.

Наши интересные политики

В этой главе предлагаю ознакомиться с самыми востребованными политиками в нашем проекте. В конце поделюсь проблемами, с которыми я столкнулся при реализации и внедрении. 

Централизованный алертинг

Когда я пришёл в проект, каждый разработчик отвечал за корректное заполнение метаданных в поле tags и добавление модуля оповещений в DAG. Для этого нужно было знать:

  • Название модуля для алертинга и корректный импорт;

  • Имя Variable, которая содержит словарь с кредами для оповещения, так как учётных записей для оповещений может быть несколько;

  • Имя Variable для room_id, в которую отправлять оповещение;

  • Корректный способ добавления оповещения через default_args. Если добавить коллбек в параметры DAG, то он не прокинется в каждое задание.

Также я обратил внимание на периодические правки на 200+ конвейеров. Такое происходило, например, при смене модуля оповещений:

Pasted image 20260515123328.png

Примечание: опытный разработчик, конечно, заметит, что для оповещений можно было реализовать абстракцию, которая бы избавила от массовых правок при смене модуля оповещений.

Как не допустить такого с помощью политик? Первой нашей политикой была установка коллбэков по умолчанию с помощью task_policy. Для этого необходимо существующие on_failure_callbacks сравнить с обязательными из переменной default_callbacks, и вставить вызовы недостающих. Принцип работы показан на схеме ниже, а нашу реализацию можно посмотреть в репозитории:

Pasted image 20260515152332.png

Разработка политик усложняется разнообразием типов объектов и специальными случаями обработки, как, например, обработка объектов из partial и expand в Airflow. 

Проверка метаданных

Для интеграции с сервисом OpenMetadata мы используем поле tags, в котором каждый разработчик заполняет данные формата ключ:значение. Также есть дополнительные требования к самому формату описания метаданных:

Pasted image 20260515102652.png

До внедрения политик изменения требований сопровождались массовыми правками, а ответственность была на инженере данных, которому меньше всего хочется заходить на корпоративный портал и искать, какие значения поддерживаются полем pool и нужно ли вставлять атрибут sla:

Pasted image 20260515134551.png

Проверка метаданных существенно проще работы с операторами в task_policy. Для реализации нескольких проверок паттерном является накопление ошибок, чтобы в интерфейсе Airflow были отображены ошибки по каждому виду политик:

dag_requires_tags.png

 Я решил не описывать реализованные нами политики требований к заполнению полей DAG DagRequiresDescription, DagRequiresNonDefaultOwner, DagRequiresDocMd и DagHasFailureNotification — их код представлен и подробно задокументирован в репозитории, который является прикладной частью статьи. Я буду рад PR для демонстрации всех возможностей политик, а также примеров внедрения через интерфейс Pluggy и продвинутой работы с task_instance_mutation_hook и pod_mutation_hook.

Польза для техподдержки

Обязательное тэгирование с валидацией данных приносит отдельную пользу: команда сопровождения и эксплуатации может сразу понять, какой сервис или домен затронут, кто отвечает за DAG и насколько критично его падение. Airflow поддерживает отображение и фильтрацию DAG по tags в интерфейсе:

Pasted image 20260526155404.png

Также мы отправляем эти данные в OpenMetadata, что оказывается полезно при ежедневной поддержке платформы:

Pasted image 20260526155650.png

Какие я встретил подводные камни

Поговорим о том, что стоит иметь в виду при разработке и внедрении policies в Airflow. Я не нашёл в одном месте описание перечисленных здесь ошибок, поэтому, возможно, это поможет именно вам:

  1. Как убедиться, что применились изменения в коде политик? При разработке политик мы работаем с папкой, смонтированной в контейнер, но её код, в отличие от кода DAG, не парсится по расписанию. Чтобы увидеть изменения, нужен restart/redeploy процессов worker и scheduler Airflow, которые импортируют airflow_local_settings.py.

  2. Политики Airflow не работают, хотя файл airflow_local_settings.py смонтирован и функции политик реализованы. Возможно, вы используете для развёртывания Airflow k8s и на уровне YAML-файла описали конфигурацию в airflowLocalSettings. Если этот параметр не выставлен в null, то файл Helm chart может смонтировать собственный airflow_local_settings.py, который перекроет файл из image/container filesystem.

  3. Какой оверхед и насколько это безопасно? Airflow policies по своей архитектуре добавляют точку отказа для каждого DAG, поэтому необходимо тщательно проверять код. Не лучшей идеей будет вставить в код политик I/O-операции, обращение к БД и сетевые запросы, дорогие импорты — это может замедлить работу вашей системы.

  4. Работа с MappedOperator и immutability: в Airflow есть понятие MappedOperator, он нужен для генерирования произвольного количества Task в runtime в зависимости от работы предыдущих задач. Если вы хотите сделать политику, которая на падение каждого объекта типа BaseOperator будет вызывать оповещение, то MappedOperator ведёт себя иначе, чем обычный BaseOperator, а многие поля должны изменяться через partial_kwargs. Для работы с ним потребуется отдельно отбирать тип оператора (см. слайд 24).

Если в других сферах применения Apache Airflow, таких как BI, DevOps и MLOps у вас возникли другие проблемы или практики, то предлагаю поделиться ими в комментариях или в репозитории.

На что это ещё похоже?

Предлагаю напоследок немного размять свою инженерную культуру и посмотреть, как в других популярных системах применяется такой же подход: централизованная проверка, ограничение кода и конфигурация до попадания в production.

Kubernetes: ValidatingAdmissionPolicy, Kyverno

В Kubernetes похожую роль выполняют admission policies — правила, которые проверяют ресурсы ещё до их сохранения в API server. ValidatingAdmissionPolicy позволяет описывать CEL-правила как обычные Kubernetes API objects. Kyverno идёт дальше и умеет не только проверять, но и менять YAML-манифесты. Это хороший пример platform-level governance для инфраструктуры.

Политики GitLab

GitLab позволяет централизованно внедрять обязательные CI jobs через Pipeline Execution Policies. Политики могут внедряться или переписывать pipeline-конфигурацию проекта и запускать проверки для всех репозиториев группы. Это слой governance поверх пользовательского CI/CD. Я искал похожие реализации уже на этапе написания статьи, и как же здорово видеть реализацию тех же полей в политиках GitLab, что мы сделали для своего класса Rule (документация про Pipeline Execution Policies).

GitHub: rulesets + required status checks

В GitHub похожая модель строится через Rulesets, branch protection и required status checks. Обычно это комбинация правил в UI/API/Terraform и CI-проверок через GitHub Actions. Такой подход позволяет централизованно вызывать заданные правила ещё до merge в main (ещё скажите, что есть другие ветки): 

Что в итоге?

Мы исследовали маленькую, но интересную часть системы Apache Airflow — политики. Проделали путь от простых проверок на отсутствующие поля DAG до создания полноценного движка, который позволяет централизованно внедрять обработчики, проверять метаданные конвейеров и в удобном формате показывать пользователю, где и как нужно поправить ошибку.

Делитесь в комментариях своими примерами, вносите свой вклад в OpenSource и покупайте сырки в Магните. Если зашла статья, то заглядывайте в мой блог: в нём я выстраиваю TIL, пишу мысли про карьеру, образование и всё, что помогает совмещать разные сферы жизни чуть менее хаотично.

Ciao!

Комментарии (0)