Оглавление:
Проблематика
Типовая задача для дата‑инженера — это перенести данные из реплики/боевой OLTP DB в аналитическое хранилище.
В данной задаче обычно нужно переносить несколько таблиц и принцип их переноса является одинаковым. Ввиду чего необходимо создавать DAG с небольшими изменениями в коде.
Чаще всего это происходит так: дата‑инженер заходит в типовой DAG и выполняет следующие действия:
Cmd+A
Cmd+C
Cmd+V
Поменял пару строчек в DAG,
совершил опечатку/неверно скопировал/что-то еще
Решение
Генерация DAG по типовому DAG (шаблону).
Создаем шаблон, изменяем автоматически все необходимые поля и радуемся пускам в прод.
Пример решения проблематики описанной выше
Ниже будет поэтапно расписано как можно просто сделать фабрику DAG, благодаря которой можно смело пускать в прод полученные DAG.
Используемые технологии:
Все операции выполнялись на:
-
MacBook Air (M1, 2020)
Оперативная память 16 Gb
macOS Monterey 12.6
-
Docker 4.17.0
-
Docker resources:
CPUs: 2
Memory: 4 Gb
Swap: 2 Gb
-
Установка Airflow
Весь проект вы сможете найти в git-репозитории. Все команды указаны в хронологическом порядке в описании проекта.
Кратко:
Поднимаем проект командой
docker-compose up -d
Пишем шаблонный DAG
Я не буду писать сложный DAG, так как в данной статье я бы хотел показать скорее суть и возможности создания фабрики DAG.
Код DAG
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
args = {
'owner': 'k0rsakov',
'start_date': datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow')),
'catchup': True,
'retries': 3,
'retry_delay': timedelta(hours=1),
'max_active_runs': 1
}
def print_something() -> None:
"""
Печатает текст.
:return: `None`
"""
print('something')
with DAG(
dag_id='template',
schedule_interval='10 0 * * *',
default_args=args,
tags=['template', 'test'],
description='',
concurrency=1
) as dag:
start = EmptyOperator(
task_id='start',
)
print_something = PythonOperator(
task_id='print_something',
python_callable=print_something,
)
end = EmptyOperator(
task_id='end',
)
start >> print_something >> end
Реализован самый простой DAG, который может что-то печатать в консоль.
Если его запустить, то получим ожидаемый результат:
Изменение его под шаблон
Давайте для начала организуем хранение нашей фабрики.
Так как все DAG попадают в веб-интерфейс из папки dags
, то мы реализуем всю логику формирования новых DAG вне этой папки.
У меня будет такая структура:
В папке dag_config_json_print_something
будет находиться файл с конфигами для наших будущих DAG (об этом расскажу ниже).
В папке templates_dags
как раз будет храниться наш шаблонный DAG.
Далее в нашем шаблоне изменяем все параметры, которые поддаются шаблонизации или те, которые должны будут меняться в зависимости от его конфигурации.
Примеры:
Исходный код:
args = {
'owner': 'k0rsakov',
'start_date': datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow')),
'catchup': True,
'retries': 3,
'retry_delay': timedelta(hours=1),
'max_active_runs': 1
}
Шаблонный код:
args = {
'owner': '$$OWNER',
'start_date': $$START_DATE,
'catchup': True,
'retries': 3,
'retry_delay': timedelta(hours=1),
'max_active_runs': 1
}
Код DAG под шаблон
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
args = {
'owner': '$$OWNER',
'start_date': $$START_DATE,
'catchup': True,
'retries': 3,
'retry_delay': timedelta(hours=1),
'max_active_runs': 1
}
def print_something() -> None:
"""
Печатает текст.
:return: `None`
"""
print('$$PRINT')
with DAG(
dag_id='$$DAG_ID',
schedule_interval='$$SCHEDULE_INTERVAL',
default_args=args,
tags=$$TAGS,
concurrency=1
) as dag:
start = EmptyOperator(
task_id='start',
)
print_something = PythonOperator(
task_id='print_something',
python_callable=print_something,
)
end = EmptyOperator(
task_id='end',
)
start >> print_something >> end
Рекомендую изменить наименование файла в .txt
чтобы IDE не ругалась на "ошибки".
Генерация DAG
Ранее мы изменили наш исходный DAG под шаблон, теперь необходимо указать все ключи, которые будут изменяться в одном файле. Я выбрал самый привычный и, как по мне, удобный вариант — JSON.
В заголовке я указываю название DAG, далее указываю все ключи, которые мы ранее создали в нашем шаблоне.
Если мы заходим шаблонизировать какое‑то поле, то добавляем его в шаблон и добавляем затем в наш config.
{
"template": {
"OWNER": "k0rsakov",
"START_DATE": "datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow'))",
"PRINT": "something",
"DAG_ID": "template",
"SCHEDULE_INTERVAL": "10 0 * * *",
"TAGS": "['template', 'test']"
}
}
Создание "генератора" DAG
На данный момент мы имеем: шаблон типового DAG, имеем config файл с информацией о будущих DAG.
Теперь мы создадим саму фабрику, которая нам будет «генерировать» DAG.
Первая функция, которая будет изменять все ключи в шаблоне, которые есть в нашем config‑файле:
Функция замены ключей в шаблоне
def replace_template_variables(template_dag: str = None, dict_variables: dict = None) -> str:
"""
Функция, которая итерируется по всем ключам ключа основного словаря берет оттуда значение и подставляет в шаблон.
Пример:
```json
{
"test":
{
"OWNER": "airflow",
"DAG_ID": "test",
"PK": "id"
...
}
}
```
На вход поступает значения по ключу `test` и функция итерируется по ключам: `OWNER`, `DAG_ID`, `PK`, ...
Берет значения по ключу и заменяет шаблон в указанном `template_dag`.
Соответственно, по всем ключам, которые есть в словаре DAG будет произведена замена по шаблону.
@param template_dag: Шаблон DAG; default 'None'.
@param dict_variables: Словарь со значениями, которые необходимо поменять в шаблоне; default 'None'.
@return: Измененный шаблон на основании значений по ключам.
"""
for variables_ in dict_variables:
template_dag = template_dag.replace(f'$${variables_}', f'{dict_variables[variables_]}')
return template_dag
Далее создадим функцию, которая будет итерироваться по всем ключам config‑файла указанного типа шаблона:
Функция для итерации по ключам выбранного типа шаблона
def dag_factory(type_dag: str = None) -> None:
"""
Функция, которая генерирует DAG на основании выбранного `type_dag` и выбранных config на основании `type_dag`.
Пример: Если указан `print_something` в `type_dag`, то функция для генерации DAG будет использовать config:
"config_json_print_something.json" и сохранит сгенерированные DAG в папку:
//dags/json_dags/print_something/<print_something_dag_name.py>
@param type_dag: Указывается тип DAG для генерации print_something|etc; default 'None'.
@return: Ничего не возвращает, а сохраняет сгенерированный DAG по определенному пути.
"""
with open(f'dag_config_json_{type_dag}/config_{type_dag}_dag.json') as j:
json_config = j.read()
json_config = json.loads(json_config)
with open(f'templates_dags/{type_dag}.txt', mode='r') as f:
template = f.read()
for dag in json_config:
modified_template = replace_template_variables(template_dag=template, dict_variables=json_config[dag])
with open(f'../dags/json_dags/{type_dag}/{dag}.py', mode='w') as dag_pyfile:
dag_pyfile.write(modified_template)
Объеденим обе функции в один файл и запустим полученный скрипт.
Получим сгенерированный DAG, который будет содержать все наши значения из config-файла.
И DAG появился в нашем веб-интерфейсе:
Масштабирование
Давайте теперь сгенерируем новые DAG по заданному шаблону.
Таким образом можно быстро создавать однотипные DAG.
Рекомендации
Для шаблона использовать хорошо проверенные DAG.
Продумать заранее все поля, которые будут шаблонизироваться.
Продумать поля, которые не нужны во всех DAG и сделать для них обработку исключений, чтобы можно было не указывать какой-то ключ.
Держать в голове ту мысль, что если изменить шаблон и запустить фабрику DAG, то изменения подтянутся во все генерируемые DAG.
Продумать вариант удаления DAG, которые исключены из config-файлов, сейчас такое не реализовано (я просто руками удаляю).
Использовать отдельную папку для генерируемых DAG, чтобы не смешивать рукописные и шаблонные DAG.
Не забывайте про идемпотентность.
Правильно используйте переменные из контекста DAG.
Итог
Фабрика DAG очень полезна, потому что позволяет не копировать писать руками однотипные DAG. Можно создать универсальные шаблоны для разных типов DAG:
Перезаписывают каждый раз табличку.
Обновляют значения по ключу.
Добавляют в конец.
Прочие однотипные DAG.
Я использую данное решение в проде и данный вариант проверен временем.