Оглавление:

Проблематика

Типовая задача для дата‑инженера — это перенести данные из реплики/боевой OLTP DB в аналитическое хранилище.

В данной задаче обычно нужно переносить несколько таблиц и принцип их переноса является одинаковым. Ввиду чего необходимо создавать DAG с небольшими изменениями в коде.

Чаще всего это происходит так: дата‑инженер заходит в типовой DAG и выполняет следующие действия:

  1. Cmd+A

  2. Cmd+C

  3. Cmd+V

  4. Поменял пару строчек в DAG, совершил опечатку/неверно скопировал/что-то еще

Решение

Генерация DAG по типовому DAG (шаблону).

Создаем шаблон, изменяем автоматически все необходимые поля и радуемся пускам в прод.

Пример решения проблематики описанной выше

Ниже будет поэтапно расписано как можно просто сделать фабрику DAG, благодаря которой можно смело пускать в прод полученные DAG.

Используемые технологии:

Все операции выполнялись на:

  • MacBook Air (M1, 2020)

    • Оперативная память 16 Gb

    • macOS Monterey 12.6

  • Docker 4.17.0

    • Docker resources:

      • CPUs: 2

      • Memory: 4 Gb

      • Swap: 2 Gb

Установка Airflow

Весь проект вы сможете найти в git-репозитории. Все команды указаны в хронологическом порядке в описании проекта.

Кратко:

Поднимаем проект командой

docker-compose up -d 

Пишем шаблонный DAG

Я не буду писать сложный DAG, так как в данной статье я бы хотел показать скорее суть и возможности создания фабрики DAG.

Код DAG
from datetime import datetime, timedelta

import pendulum

from airflow import DAG

from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator


args = {
    'owner': 'k0rsakov',
    'start_date': datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow')),
    'catchup': True,
    'retries': 3,
    'retry_delay': timedelta(hours=1),
    'max_active_runs': 1
}


def print_something() -> None:
    """
    Печатает текст.

    :return: `None`
    """
    print('something')

with DAG(
        dag_id='template',
        schedule_interval='10 0 * * *',
        default_args=args,
        tags=['template', 'test'],
        description='',
        concurrency=1
) as dag:

    start = EmptyOperator(
        task_id='start',
    )

    print_something = PythonOperator(
        task_id='print_something',
        python_callable=print_something,
    )

    end = EmptyOperator(
        task_id='end',
    )

    start >> print_something >> end

Реализован самый простой DAG, который может что-то печатать в консоль.

Если его запустить, то получим ожидаемый результат:

Изменение его под шаблон

Давайте для начала организуем хранение нашей фабрики.

Так как все DAG попадают в веб-интерфейс из папки dags, то мы реализуем всю логику формирования новых DAG вне этой папки.

У меня будет такая структура:

В папке dag_config_json_print_something будет находиться файл с конфигами для наших будущих DAG (об этом расскажу ниже).

В папке templates_dags как раз будет храниться наш шаблонный DAG.

Далее в нашем шаблоне изменяем все параметры, которые поддаются шаблонизации или те, которые должны будут меняться в зависимости от его конфигурации.

Примеры:

Исходный код:

args = {
    'owner': 'k0rsakov',
    'start_date': datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow')),
    'catchup': True,
    'retries': 3,
    'retry_delay': timedelta(hours=1),
    'max_active_runs': 1
}

Шаблонный код:

args = {
    'owner': '$$OWNER',
    'start_date': $$START_DATE,
    'catchup': True,
    'retries': 3,
    'retry_delay': timedelta(hours=1),
    'max_active_runs': 1
}
Код DAG под шаблон
from datetime import datetime, timedelta

import pendulum

from airflow import DAG

from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator


args = {
    'owner': '$$OWNER',
    'start_date': $$START_DATE,
    'catchup': True,
    'retries': 3,
    'retry_delay': timedelta(hours=1),
    'max_active_runs': 1
}


def print_something() -> None:
    """
    Печатает текст.

    :return: `None`
    """
    print('$$PRINT')

with DAG(
        dag_id='$$DAG_ID',
        schedule_interval='$$SCHEDULE_INTERVAL',
        default_args=args,
        tags=$$TAGS,
        concurrency=1
) as dag:

    start = EmptyOperator(
        task_id='start',
    )

    print_something = PythonOperator(
        task_id='print_something',
        python_callable=print_something,
    )

    end = EmptyOperator(
        task_id='end',
    )

    start >> print_something >> end

Рекомендую изменить наименование файла в .txt чтобы IDE не ругалась на "ошибки".

Генерация DAG

Ранее мы изменили наш исходный DAG под шаблон, теперь необходимо указать все ключи, которые будут изменяться в одном файле. Я выбрал самый привычный и, как по мне, удобный вариант — JSON.

В заголовке я указываю название DAG, далее указываю все ключи, которые мы ранее создали в нашем шаблоне.

Если мы заходим шаблонизировать какое‑то поле, то добавляем его в шаблон и добавляем затем в наш config.

{
  "template": {
    "OWNER": "k0rsakov",
    "START_DATE": "datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow'))",
    "PRINT": "something",
    "DAG_ID": "template",
    "SCHEDULE_INTERVAL": "10 0 * * *",
    "TAGS": "['template', 'test']"
  }
}

Создание "генератора" DAG

На данный момент мы имеем: шаблон типового DAG, имеем config файл с информацией о будущих DAG.

Теперь мы создадим саму фабрику, которая нам будет «генерировать» DAG.

Первая функция, которая будет изменять все ключи в шаблоне, которые есть в нашем config‑файле:

Функция замены ключей в шаблоне
def replace_template_variables(template_dag: str = None, dict_variables: dict = None) -> str:
    """
    Функция, которая итерируется по всем ключам ключа основного словаря берет оттуда значение и подставляет в шаблон.

    Пример:
    ```json
    {
      "test":
      {
        "OWNER": "airflow",
        "DAG_ID": "test",
        "PK": "id"
        ...
      }
    }
    ```
    На вход поступает значения по ключу `test` и функция итерируется по ключам: `OWNER`, `DAG_ID`, `PK`, ...
    Берет значения по ключу и заменяет шаблон в указанном `template_dag`.

    Соответственно, по всем ключам, которые есть в словаре DAG будет произведена замена по шаблону.

    @param template_dag: Шаблон DAG; default 'None'.
    @param dict_variables: Словарь со значениями, которые необходимо поменять в шаблоне; default 'None'.
    @return: Измененный шаблон на основании значений по ключам.
    """
    for variables_ in dict_variables:
        template_dag = template_dag.replace(f'$${variables_}', f'{dict_variables[variables_]}')

    return template_dag

Далее создадим функцию, которая будет итерироваться по всем ключам config‑файла указанного типа шаблона:

Функция для итерации по ключам выбранного типа шаблона
def dag_factory(type_dag: str = None) -> None:
    """
    Функция, которая генерирует DAG на основании выбранного `type_dag` и выбранных config на основании `type_dag`.

    Пример: Если указан `print_something` в `type_dag`, то функция для генерации DAG будет использовать config:
    "config_json_print_something.json" и сохранит сгенерированные DAG в папку:
    //dags/json_dags/print_something/<print_something_dag_name.py>

    @param type_dag: Указывается тип DAG для генерации print_something|etc; default 'None'.
    @return: Ничего не возвращает, а сохраняет сгенерированный DAG по определенному пути.
    """
    with open(f'dag_config_json_{type_dag}/config_{type_dag}_dag.json') as j:
        json_config = j.read()
        json_config = json.loads(json_config)

    with open(f'templates_dags/{type_dag}.txt', mode='r') as f:
        template = f.read()

    for dag in json_config:
        modified_template = replace_template_variables(template_dag=template, dict_variables=json_config[dag])
        with open(f'../dags/json_dags/{type_dag}/{dag}.py', mode='w') as dag_pyfile:
            dag_pyfile.write(modified_template)

Объеденим обе функции в один файл и запустим полученный скрипт.

Получим сгенерированный DAG, который будет содержать все наши значения из config-файла.

И DAG появился в нашем веб-интерфейсе:

Масштабирование

Давайте теперь сгенерируем новые DAG по заданному шаблону.

Таким образом можно быстро создавать однотипные DAG.

Рекомендации

  • Для шаблона использовать хорошо проверенные DAG.

  • Продумать заранее все поля, которые будут шаблонизироваться.

  • Продумать поля, которые не нужны во всех DAG и сделать для них обработку исключений, чтобы можно было не указывать какой-то ключ.

  • Держать в голове ту мысль, что если изменить шаблон и запустить фабрику DAG, то изменения подтянутся во все генерируемые DAG.

  • Продумать вариант удаления DAG, которые исключены из config-файлов, сейчас такое не реализовано (я просто руками удаляю).

  • Использовать отдельную папку для генерируемых DAG, чтобы не смешивать рукописные и шаблонные DAG.

  • Не забывайте про идемпотентность.

  • Правильно используйте переменные из контекста DAG.

Итог

Фабрика DAG очень полезна, потому что позволяет не копировать писать руками однотипные DAG. Можно создать универсальные шаблоны для разных типов DAG:

  • Перезаписывают каждый раз табличку.

  • Обновляют значения по ключу.

  • Добавляют в конец.

  • Прочие однотипные DAG.

Я использую данное решение в проде и данный вариант проверен временем.

Комментарии (0)