Оркестратор задач — инструмент или система, предназначенные для управления и контроля выполнением задач в сложных вычислительных и информационных средах. Он облегчают процесс развертывания, автоматизации и управления выполнением задач, что позволяет повысить эффективность работы и оптимизировать ресурсы.
Одним из популярных оркестратором задач является Apache Airflow. Он, как и все инструменты, имеет свои преимущества и недостатки, о которых пойдет речь в данной статье
Apache Airflow
Apache Airflow - это платформа для программирования, планирования и мониторинга рабочих процессов (Workflows). Он позволяет создавать и управлять сложными зависимостями между задачами, автоматизировать их выполнение и мониторить процесс обработки данных.
Airflow предоставляет графический интерфейс для создания, выполнения и мониторинга рабочих процессов, также он позволяет создавать рабочие процессы в форме направленных ацикличных графов (DAG), где каждый узел представляет собой отдельную задачу. Это обеспечивает удобство и гибкость в управлении задачами и зависимостями между ними.
Airflow широко используется для автоматизации распределенных вычислений, обработки данных, управления ETL (Extract, Transform, Load) процессами, планирования задач и других сценариев работы с данными.
По умолчанию Apache Airflow использует SQLite, как базу данных.
Один из способов использования Apache Airflow - docker файл.
docker-compose
version: '3.8'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-qwerty12345!}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
command:
- bash
- -c
- airflow
volumes:
postgres-db-volume:
Несколько популярных операторов в Apache Airflow:
EmptyOperator
Этот оператор представляет собой очень простой оператор в Apache Airflow, который не выполняет никаких реальных действий.
Ожидание или задержка: можно использовать EmptyOperator для создания задержки между выполнением задач в вашем DAG.
Создание структуры DAG: может быть использован для создания структуры вашего DAG, когда у вас еще нет конкретных операций для некоторых частей рабочего процесса.
Использование в качестве заполнителя: может понадобиться создать «место действия» для определенных зависимостей
Тестирование: при разработке и тестировании DAG, вы можете использовать EmptyOperator, чтобы вставить временную заглушку вместо реальных операций.
Пример кода с EmptyOperator:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.empty import EmptyOperator
default_my_args = {
'owner': 'PB_Academy',
'retries': 5
}
with DAG (
dag_id = 'dag_empty',
description='ДАГ с использованием bash оператора',
schedule_interval='@once',
default_args= default_my_args,
start_date=days_ago(1)
) as dag:
start_task = EmptyOperator(
task_id = 'start_task'
)
task_1_1 = EmptyOperator(
task_id = 'task_1_1'
)
task_1_2 = EmptyOperator(
task_id = 'task_1_2'
)
union_task = EmptyOperator(
task_id = 'union'
)
task_2_1 = EmptyOperator(
task_id = 'task_2_1'
)
task_2_2 = EmptyOperator(
task_id = 'task_2_2'
)
finish_task = EmptyOperator(
task_id = 'finish_task'
)
start_task >> [task_1_1, task_1_2] >> union_task >>[task_2_1, task_2_2] >> finish_task
Начиная с версии 2.3.0 Apache Airflow появился данный оператор. До этого существовал DummyOperator. В версии 2.3.4 разработчики призывают переходить на EmptyOperator.
BashOperator
Этот оператор используется для выполнения произвольных bash‑команд или скриптов. Он является универсальным инструментом для выполнения различных командных операций в рамках ваших рабочих процессов.
Пример кода с BashOperator:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_my_args = {
'owner': 'PB_Academy',
'retries': 5,
'retry_delay': timedelta(minutes=1)
}
with DAG(
dag_id='dag_bush',
default_args= default_my_args,
description='ДАГ с использованием bash оператора',
start_date= datetime(2024, 5, 7, 16, 50, 0),
schedule_interval='@daily'
) as dag:
task1 = BashOperator(
task_id = 'first_task',
bash_command= 'mkdir PB_Academy'
)
task2 = BashOperator(
task_id = 'second_task',
bash_command= 'touch tmp.txt'
)
task3 = BashOperator(
task_id = 'thrird_task',
bash_command= 'echo Hello World'
)
task1 >> task2 >> task3
PostgresOperator
Данный оператор позволяет выполнять SQL-запросы в базе данных PostgreSQL. Он предоставляет удобный способ выполнения различных операций с базой данных PostgreSQL в рамках ваших DAG.
Для того, чтобы установить PostgresOperator рекомендуем выполнить следующую команду:
pip install apache-airflow-providers-postgres
Пример кода с PostgresOperator:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.postgres.operators.postgres import PostgresOperator
default_my_args = {
'owner': 'PB_Academy',
'retries': 5
}
with DAG(
dag_id = 'dag_postgres',
schedule_interval='@once',
default_args= default_my_args,
start_date= datetime(2023, 12, 8)
) as dag:
task_pg_create = PostgresOperator(
task_id='task_pg_create',
postgres_conn_id='postgres_default',
sql='CREATE TABLE test_1(id SERIAL, date TIMESTAMP);'
)
task_pg_insert = PostgresOperator(
task_id='task_pg_insert',
postgres_conn_id='postgres_default',
sql='INSERT INTO test_1 (date) VALUES (localtimestamp);'
)
task_pg_create >> task_pg_insert
PythonOperator
Этот оператор позволяет запускать произвольные функции Python в вашем DAG. Это может быть полезно, если вам нужно выполнить какой‑то кастомный код или манипуляции с данными в Python.
Для общения между задачами в рамках одного dag существует механизм — XCom. Это словарь, из которого можно брать данные. Как конфиги, так и результаты других задач. Данные в XCom могут быть как public, так и private. Push — отправить данные в словарь, pull — получить. Стоит добавить, что XCom не подчищает за собой данные, так что нужно побеспокоиться, чтобы после выполнения сессии dag удалял историю.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_my_args = {
'owner': 'PB_Academy',
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
def print_param(ti):
sql = ti.xcom_pull(task_ids = 'get_param', key = 'SQL')
nosql = ti.xcom_pull(task_ids = 'get_param', key = 'NoSQL')
print(f"SQL database: {sql} \n NoSQL database: {nosql}")
def get_param(ti):
ti.xcom_push(key = 'SQL', value = ['MS SQL','PostgreSQL','Oracle'])
ti.xcom_push(key = 'NoSQL', value = ['MongoDB','Redis','Elasticsearch'])
with DAG(
dag_id='dag_python',
default_args= default_my_args,
description='-',
start_date= datetime(2023, 12, 8, 8,47),
schedule_interval='@once'
) as dag:
task1 = PythonOperator(
task_id='task_print_param',
python_callable = print_param,
)
task2 = PythonOperator(
task_id='task_get_param',
python_callable = get_param
)
task2 >> task1
Task Flow API
Task Flow API — это часть Apache Airflow, которая предоставляет возможность программно создавать и управлять задачами и их зависимостями в DAG. Он позволяет создавать и изменять DAG и его структуру изнутри самого Python‑кода, а не только из конфигурационных файлов.
Пример кода с иcпользованием Task Flow API:
from datetime import datetime
from airflow.decorators import dag, task
default_my_args = {
'owner': 'PB_Academy',
'retries': 5
}
@dag(
dag_id = 'taskflow_api_dag',
default_args = default_my_args,
start_date = datetime(2023,12,10),
schedule_interval='@daily'
)
def calculate_etl():
@task(multiple_outputs = True)
def get_numbers():
return {
'number_1' : 21,
'number_2' : 99
}
@task
def calculate(number_1, number_2):
print(number_1 + number_2)
dicts = get_numbers()
calculate(dicts['number_1'], dicts['number_2'])
my_dag = calculate_etl()
Дополнительно
catchup — параметр в Apache Airflow, который определяет, должны ли старые даты выполнения задач быть выполнены при первом запуске DAG. Когда catchup установлен в True, Apache Airflow автоматически запускает пропущенные даты выполнения задач после того, как DAG был включен в работу.
Пример кода с catch up:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_my_args = {
'owner': 'PB_Academy',
'retries': 5
}
with DAG(
dag_id='dag_catchup',
default_args= default_my_args,
start_date= datetime(2023, 12, 7, 16, 50, 0),
schedule_interval='@daily',
catchup=True
) as dag:
task1 = BashOperator(
task_id = 'first_task',
bash_command= 'echo Hello World'
)
В Apache Airflow можно написать задачу, которая получает статус других задач. Данная задача будет полезна для:
Отчетность и мониторинг: Задача получения статуса задач может использоваться для создания мониторинговых или отчетных процедур, которые сообщают о статусе выполнения других задач. Например, вы можете хотеть получить уведомление или отправить отчет, когда определенная задача завершается успешно или неудачно.
Автоматизация бизнес‑процессов: Получение статуса задач также может быть использовано для автоматизации бизнес‑процессов. Например, если определенная задача завершилась успешно, вы можете автоматически запускать следующие действия или процессы.
Управление зависимостями: Получение статуса задач может помочь в управлении зависимостями и выполнением последующих действий в зависимости от статуса предыдущих задач. Например, если одна задача завершилась неудачно, вы можете хотеть принять решение о перезапуске другой задачи или выполнении альтернативных действий.
Пример кода с получением статуса задачи:
from airflow.models.dagrun import DagRun
from airflow.decorators import dag, task
from datetime import datetime
default_my_args = {
'owner': 'PB_Academy',
'retries': 5
}
@dag(
dag_id = 'dag_with_status',
default_args = default_my_args,
start_date = datetime(2023,12,10),
schedule_interval='@daily'
)
def calculate_etl():
@task()
def get_status_dag():
dag_id = 'dag_with_status'
dag_runs = DagRun.find(dag_id=dag_id)
print(f'Length: {len(dag_runs)}')
for dag_run in dag_runs:
if dag_run.external_trigger == False :
print(f'Dag Name {dag_run}\n Dag Status {dag_run.state}')
@task()
def success_task():
4 / 2
@task()
def fail_task():
4 / 0
success_task()
fail_task()
get_status_dag()
my_dag = calculate_etl()
Параметр schedule_interval используется для определения расписания выполнения задач в рамках DAG. Этот параметр указывает, с какой периодичностью DAG и его задачи должны выполняться.schedule_interval может быть определен различными способами, включая крон-выражения, объекты datetime.timedelta или ключевые слова, такие как @daily, @hourly и т. д.
Вывод
Таким образом, Apache Airflow часто выбирают для сложных рабочих процессов с большим количеством задач. Подводя итог, рассмотрим основные плюсы и минусы данной системы:
Apache Airflow | |||
Плюсы |
Минусы |
||
Гибкость Airflow написан на Python, это делает его гибким и легко интегрируемым с другими инструментами. |
Сложность настройки Наличие неявных зависимостей может усложнить начальную настройку и конфигурацию. |
||
Мощный пользовательский интерфейс Графический веб-интерфейс позволяет легко создавать и мониторить рабочие процессы. |
Требования к ресурсам Для управления большими рабочими процессами требуются значительные вычислительные ресурсы. |
||
Расширяемость Поддерживает множество плагинов и интеграций с различными сервисами и базами данных. |
|||
Масштабируемость Модульная архитектура и очереди сообщений позволяют обрабатывать большое количество задач. |
Для изучения данной темы рекомендуем прочитать книгу Баса Харенслака и Джулиана де Руйтера «Apache Airflow и конвейеры обработки данных»
astentx
TaskFlow API это не программная генерация структуры дага, а возможность писать флоу в виде вызовов Python-функций как обычный код вместо декларативного и многословного "рисования стрелочек" с передачей параметров откуда-то сбоку: Airflow при этом сам генерирует DAG из этих простых вызовов, делает XCom.push/pull.
А вот насчёт GUI не сказать, что очень продвинутый. Создавать задания там нельзя, только смотреть. Причем в версиях после 2.5 ветвистый граф на десятки задач с несбалансированным ветками рисуется некрасиво. Фильтрации задач по имени и тегирования тасков в графе нет, только подсветка. Обычный листинг всех дагов с тегами и фильтрами, никакой структуры хранения дагов ввести нельзя. Но вообще GUI постоянно дорабатывают и добавляют новые фичи.