Оркестратор задач — инструмент или система, предназначенные для управления и контроля выполнением задач в сложных вычислительных и информационных средах. Он облегчают процесс развертывания, автоматизации и управления выполнением задач, что позволяет повысить эффективность работы и оптимизировать ресурсы.

Одним из популярных оркестратором задач является Apache Airflow. Он, как и все инструменты, имеет свои преимущества и недостатки, о которых пойдет речь в данной статье

Apache Airflow

Apache Airflow - это платформа для программирования, планирования и мониторинга рабочих процессов (Workflows). Он позволяет создавать и управлять сложными зависимостями между задачами, автоматизировать их выполнение и мониторить процесс обработки данных.

Airflow предоставляет графический интерфейс для создания, выполнения и мониторинга рабочих процессов, также он позволяет создавать рабочие процессы в форме направленных ацикличных графов (DAG), где каждый узел представляет собой отдельную задачу. Это обеспечивает удобство и гибкость в управлении задачами и зависимостями между ними.

Airflow широко используется для автоматизации распределенных вычислений, обработки данных, управления ETL (Extract, Transform, Load) процессами, планирования задач и других сценариев работы с данными.

По умолчанию Apache Airflow использует SQLite, как базу данных.

Один из способов использования Apache Airflow - docker файл.

docker-compose
version: '3.8'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-qwerty12345!}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    command:
      - bash
      - -c
      - airflow

volumes:
  postgres-db-volume:

Несколько популярных операторов в Apache Airflow:

EmptyOperator

Этот оператор представляет собой очень простой оператор в Apache Airflow, который не выполняет никаких реальных действий.

  • Ожидание или задержка: можно использовать EmptyOperator для создания задержки между выполнением задач в вашем DAG.

  • Создание структуры DAG: может быть использован для создания структуры вашего DAG, когда у вас еще нет конкретных операций для некоторых частей рабочего процесса.

  • Использование в качестве заполнителя: может понадобиться создать «место действия» для определенных зависимостей

  • Тестирование: при разработке и тестировании DAG, вы можете использовать EmptyOperator, чтобы вставить временную заглушку вместо реальных операций.

Пример кода с EmptyOperator:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.empty import EmptyOperator

default_my_args = {
    'owner': 'PB_Academy',
    'retries': 5
}

with DAG (
    dag_id = 'dag_empty',
    description='ДАГ с использованием bash оператора',
    schedule_interval='@once',
    default_args= default_my_args, 
    start_date=days_ago(1)
) as dag:
    start_task = EmptyOperator(
        task_id = 'start_task'
    )
    task_1_1 = EmptyOperator(
        task_id = 'task_1_1'
    )
    task_1_2 = EmptyOperator(
        task_id = 'task_1_2'
    )
    union_task = EmptyOperator(
        task_id = 'union'
    )
    task_2_1 = EmptyOperator(
        task_id = 'task_2_1'
    )
    task_2_2 = EmptyOperator(
        task_id = 'task_2_2'
    )
    finish_task = EmptyOperator(
        task_id = 'finish_task'
    )
    start_task >> [task_1_1, task_1_2] >> union_task >>[task_2_1, task_2_2] >> finish_task
Отображение dag с помощью графа
Отображение dag с помощью графа

Начиная с версии 2.3.0 Apache Airflow появился данный оператор. До этого существовал DummyOperator. В версии 2.3.4 разработчики призывают переходить на EmptyOperator.

BashOperator

Этот оператор используется для выполнения произвольных bash‑команд или скриптов. Он является универсальным инструментом для выполнения различных командных операций в рамках ваших рабочих процессов.

Пример кода с BashOperator:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_my_args = {
    'owner': 'PB_Academy',
    'retries': 5,
    'retry_delay': timedelta(minutes=1)
}
with DAG(
   dag_id='dag_bush',
   default_args= default_my_args, 
   description='ДАГ с использованием bash оператора',
   start_date= datetime(2024, 5, 7, 16, 50, 0),
   schedule_interval='@daily'
) as dag: 
    task1 = BashOperator(
        task_id = 'first_task',
        bash_command= 'mkdir PB_Academy'
    )
    task2 = BashOperator(
        task_id = 'second_task',
        bash_command= 'touch tmp.txt'
    )
    task3 = BashOperator(
        task_id = 'thrird_task',
        bash_command= 'echo Hello World'
    )
  task1 >> task2 >> task3

PostgresOperator

Данный оператор позволяет выполнять SQL-запросы в базе данных PostgreSQL. Он предоставляет удобный способ выполнения различных операций с базой данных PostgreSQL в рамках ваших DAG.

Для того, чтобы установить PostgresOperator рекомендуем выполнить следующую команду:

pip install apache-airflow-providers-postgres

Пример кода с PostgresOperator:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.postgres.operators.postgres import PostgresOperator

default_my_args = {
    'owner': 'PB_Academy',
    'retries': 5
}

with DAG(
    dag_id = 'dag_postgres',
    schedule_interval='@once',
    default_args= default_my_args, 
    start_date= datetime(2023, 12, 8)
) as dag:
    task_pg_create = PostgresOperator(
        task_id='task_pg_create',
        postgres_conn_id='postgres_default',
        sql='CREATE TABLE test_1(id SERIAL, date TIMESTAMP);'
    ) 
    task_pg_insert = PostgresOperator(
        task_id='task_pg_insert',
        postgres_conn_id='postgres_default',
        sql='INSERT INTO test_1 (date) VALUES (localtimestamp);'
    )
    task_pg_create >> task_pg_insert

PythonOperator

Этот оператор позволяет запускать произвольные функции Python в вашем DAG. Это может быть полезно, если вам нужно выполнить какой‑то кастомный код или манипуляции с данными в Python.

Для общения между задачами в рамках одного dag существует механизм — XCom. Это словарь, из которого можно брать данные. Как конфиги, так и результаты других задач. Данные в XCom могут быть как public, так и private. Push — отправить данные в словарь, pull — получить. Стоит добавить, что XCom не подчищает за собой данные, так что нужно побеспокоиться, чтобы после выполнения сессии dag удалял историю.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_my_args = {
    'owner': 'PB_Academy',
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

def print_param(ti):
    sql = ti.xcom_pull(task_ids = 'get_param', key = 'SQL')
    nosql = ti.xcom_pull(task_ids = 'get_param', key = 'NoSQL')
    print(f"SQL database: {sql} \n NoSQL database: {nosql}")

def get_param(ti):
    ti.xcom_push(key = 'SQL', value = ['MS SQL','PostgreSQL','Oracle'])
    ti.xcom_push(key = 'NoSQL', value = ['MongoDB','Redis','Elasticsearch'])

with DAG(
   dag_id='dag_python',
   default_args= default_my_args, 
   description='-',
   start_date= datetime(2023, 12, 8, 8,47),
   schedule_interval='@once'
) as dag:
    task1 = PythonOperator(
        task_id='task_print_param',
        python_callable = print_param,
    )
    task2 = PythonOperator(
        task_id='task_get_param',
        python_callable = get_param
    )
    task2 >> task1

Task Flow API

Task Flow API — это часть Apache Airflow, которая предоставляет возможность программно создавать и управлять задачами и их зависимостями в DAG. Он позволяет создавать и изменять DAG и его структуру изнутри самого Python‑кода, а не только из конфигурационных файлов.

Пример кода с иcпользованием Task Flow API:

from datetime import datetime
from airflow.decorators import dag, task

default_my_args = {
    'owner': 'PB_Academy',
    'retries': 5
}

@dag(
    dag_id = 'taskflow_api_dag',
    default_args = default_my_args,
    start_date = datetime(2023,12,10),
    schedule_interval='@daily'
)
def calculate_etl():

    @task(multiple_outputs = True)
    def get_numbers():
        return {
            'number_1' : 21,
            'number_2' : 99
        }

    @task
    def calculate(number_1, number_2):
        print(number_1 + number_2)
    
    dicts = get_numbers()
    calculate(dicts['number_1'], dicts['number_2'])
    
my_dag = calculate_etl()

Дополнительно

catchup — параметр в Apache Airflow, который определяет, должны ли старые даты выполнения задач быть выполнены при первом запуске DAG. Когда catchup установлен в True, Apache Airflow автоматически запускает пропущенные даты выполнения задач после того, как DAG был включен в работу.

Пример кода с catch up:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_my_args = {
    'owner': 'PB_Academy',
    'retries': 5
}

with DAG(
   dag_id='dag_catchup',
   default_args= default_my_args, 
   start_date= datetime(2023, 12, 7, 16, 50, 0),
   schedule_interval='@daily',
   catchup=True
) as dag: 
    task1 = BashOperator(
        task_id = 'first_task',
        bash_command= 'echo Hello World'
    )

В Apache Airflow можно написать задачу, которая получает статус других задач. Данная задача будет полезна для:

  1. Отчетность и мониторинг: Задача получения статуса задач может использоваться для создания мониторинговых или отчетных процедур, которые сообщают о статусе выполнения других задач. Например, вы можете хотеть получить уведомление или отправить отчет, когда определенная задача завершается успешно или неудачно.

  2. Автоматизация бизнес‑процессов: Получение статуса задач также может быть использовано для автоматизации бизнес‑процессов. Например, если определенная задача завершилась успешно, вы можете автоматически запускать следующие действия или процессы.

  3. Управление зависимостями: Получение статуса задач может помочь в управлении зависимостями и выполнением последующих действий в зависимости от статуса предыдущих задач. Например, если одна задача завершилась неудачно, вы можете хотеть принять решение о перезапуске другой задачи или выполнении альтернативных действий.

Пример кода с получением статуса задачи:

from airflow.models.dagrun import DagRun
from airflow.decorators import dag, task
from datetime import datetime

default_my_args = {
    'owner': 'PB_Academy',
    'retries': 5
}

@dag(
    dag_id = 'dag_with_status',
    default_args = default_my_args,
    start_date = datetime(2023,12,10),
    schedule_interval='@daily'
)
def calculate_etl():

    @task()
    def get_status_dag():
        dag_id = 'dag_with_status'
        dag_runs = DagRun.find(dag_id=dag_id)
        print(f'Length: {len(dag_runs)}')
        for dag_run in dag_runs:
            if dag_run.external_trigger == False :
                print(f'Dag Name {dag_run}\n Dag Status {dag_run.state}')
    
    @task()
    def success_task():
        4 / 2
    
    @task()
    def fail_task():
        4 / 0
        
    success_task()
    fail_task()
    get_status_dag()

my_dag = calculate_etl()

Параметр schedule_interval используется для определения расписания выполнения задач в рамках DAG. Этот параметр указывает, с какой периодичностью DAG и его задачи должны выполняться.schedule_interval может быть определен различными способами, включая крон-выражения, объекты datetime.timedelta или ключевые слова, такие как @daily, @hourly и т. д.

Вывод

Таким образом, Apache Airflow часто выбирают для сложных рабочих процессов с большим количеством задач. Подводя итог, рассмотрим основные плюсы и минусы данной системы:

Apache Airflow

Плюсы

Минусы

Гибкость

Airflow написан на Python, это делает его гибким и легко интегрируемым с другими инструментами.

Сложность настройки

Наличие неявных зависимостей может усложнить начальную настройку и конфигурацию.

Мощный пользовательский интерфейс

Графический веб-интерфейс позволяет легко создавать и мониторить рабочие процессы.

Требования к ресурсам

Для управления большими рабочими процессами требуются значительные вычислительные ресурсы.

Расширяемость

Поддерживает множество плагинов и интеграций с различными сервисами и базами данных.

Масштабируемость

Модульная архитектура и очереди сообщений позволяют обрабатывать большое количество задач.

Для изучения данной темы рекомендуем прочитать книгу Баса Харенслака и Джулиана де Руйтера «Apache Airflow и конвейеры обработки данных»

Комментарии (2)


  1. astentx
    17.05.2024 19:02

    TaskFlow API это не программная генерация структуры дага, а возможность писать флоу в виде вызовов Python-функций как обычный код вместо декларативного и многословного "рисования стрелочек" с передачей параметров откуда-то сбоку: Airflow при этом сам генерирует DAG из этих простых вызовов, делает XCom.push/pull.

    А вот насчёт GUI не сказать, что очень продвинутый. Создавать задания там нельзя, только смотреть. Причем в версиях после 2.5 ветвистый граф на десятки задач с несбалансированным ветками рисуется некрасиво. Фильтрации задач по имени и тегирования тасков в графе нет, только подсветка. Обычный листинг всех дагов с тегами и фильтрами, никакой структуры хранения дагов ввести нельзя. Но вообще GUI постоянно дорабатывают и добавляют новые фичи.


  1. ptr128
    17.05.2024 19:02

    Он DAG умеет уже в БД хранить?