Apache Airflow давно стал стандартом в мире Data Engineering благодаря своей гибкости, масштабируемости и богатой экосистеме. В этой статье мы подробно разберём, что такое Airflow, почему он так популярен, как эффективно использовать его в аналитической архитектуре, а также предоставим максимально подробную инструкцию по развертыванию Airflow.

Что такое Apache Airflow?

Apache Airflow — это платформа с открытым исходным кодом для создания, планирования и мониторинга рабочих процессов (workflow). Основная сила Airflow заключается в возможности организации сложных ETL-процессов в виде управляемых графов задач (DAG — Directed Acyclic Graph).

Основные компоненты Airflow

DAG (Directed Acyclic Graph)

DAG — это структурированный набор задач, связанных между собой четко определенными зависимостями. Основные характеристики DAG:

  • Задачи выполняются согласно строгому порядку, без циклических зависимостей.

  • Каждый DAG описывается с помощью кода Python, что делает процесс разработки и поддержки интуитивно понятным.

  • DAG позволяет ясно визуализировать и контролировать последовательность задач.

Task

Task (задача) является основной рабочей единицей внутри DAG:

  • Может выполнять любой код на Python, SQL-запросы или системные команды.

  • Использует специальные операторы Airflow (например, PythonOperator, BashOperator, PostgresOperator).

  • Позволяет конфигурировать параметры выполнения, такие как время запуска, поведение при ошибках и условия выполнения.

Scheduler

Scheduler (планировщик) отвечает за запуск DAG в соответствии с заданным расписанием:

  • Регулярно проверяет статус всех задач и запускает новые задачи в соответствии с графом.

  • Управляет состоянием задач (запуск, повторная попытка, пропуск, завершение).

  • Обеспечивает корректную обработку расписаний и зависимостей.

Executor

Executor управляет выполнением задач, распределением нагрузки и масштабированием:

  • SequentialExecutor: Запускает задачи последовательно (подходит для разработки и тестирования).

  • LocalExecutor: Запускает задачи параллельно на одном узле.

  • CeleryExecutor: Использует Celery для распределенного выполнения задач на нескольких узлах.

  • KubernetesExecutor: Запускает задачи в виде отдельных подов в Kubernetes, обеспечивая динамическое масштабирование.

Webserver

Webserver предоставляет веб-интерфейс для управления и мониторинга процессов Airflow:

  • Позволяет визуально отслеживать выполнение задач и графов.

  • Поддерживает интерфейс для ручного запуска и перезапуска задач.

  • Содержит встроенные инструменты для просмотра логов и отладки.

Metadata Database

Metadata Database (база метаданных) хранит информацию о всех DAG, задачах и их состоянии:

  • Обычно используется PostgreSQL или MySQL для хранения метаданных.

  • Содержит исторические данные о выполнении задач, позволяя анализировать производительность и ошибки.

  • Обеспечивает сохранение состояния даже после перезапуска Airflow.

Подробная инструкция по развертыванию Apache Airflow

Шаг 1: Подготовка окружения

Используйте Linux-сервер с установленным Docker и Docker Compose.

Установите Docker:

sudo apt update
sudo apt install docker.io docker-compose -y
sudo systemctl start docker
sudo systemctl enable docker

Шаг 2: Создание файла Docker Compose

Создайте директорию проекта:

mkdir airflow
cd airflow

Создайте файл docker-compose.yaml:

version: '3'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"

  redis:
    image: redis:latest
    ports:
      - "6379:6379"

  webserver:
    image: apache/airflow:2.9.1
    depends_on:
      - postgres
      - redis
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__WEBSERVER__SECRET_KEY: ''
    ports:
      - "8080:8080"
    command: webserver

  scheduler:
    image: apache/airflow:2.9.1
    depends_on:
      - webserver
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    command: scheduler

  worker:
    image: apache/airflow:2.9.1
    depends_on:
      - scheduler
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    command: celery worker

Шаг 3: Инициализация базы данных и создание пользователя

Запустите контейнеры:

sudo docker-compose up airflow-init

Шаг 4: Запуск Airflow

Запустите Airflow в режиме демона:

sudo docker-compose up -d

Шаг 5: Доступ к Airflow Web UI

Перейдите по адресу: http://localhost:8080

Стандартные учетные данные:

  • логин: airflow

  • пароль: airflow

Создание первого DAG

Создание DAG начинается с определения задач и их последовательности. Рассмотрим этот процесс максимально подробно:

Шаг 1: Импорт необходимых библиотек

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extract():
    print("Extracting data...")

def transform():
    print("Transforming data...")

def load():
    print("Loading data into warehouse...")

with DAG('etl_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
    extract_task = PythonOperator(task_id='extract', python_callable=extract)
    transform_task = PythonOperator(task_id='transform', python_callable=transform)
    load_task = PythonOperator(task_id='load', python_callable=load)

    extract_task >> transform_task >> load_task

Шаг 2: Определение функций задач

Каждая задача представляет собой отдельную функцию:

def extract():
    print("Извлечение данных из источника...")

def transform():
    print("Преобразование и очистка данных...")

def load():
    print("Загрузка данных в хранилище...")

Шаг 3: Настройка аргументов по умолчанию

Установите аргументы, которые будут использоваться по умолчанию:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

Шаг 4: Создание DAG

Создайте DAG с настройкой расписания выполнения:

with DAG(
    'etl_pipeline',
    default_args=default_args,
    description='Простой ETL-пайплайн',
    schedule_interval='@daily',
    catchup=False
) as dag:

Шаг 5: Создание задач внутри DAG

Определите каждую задачу с помощью операторов Airflow:

    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract
    )

    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform
    )

    load_task = PythonOperator(
        task_id='load',
        python_callable=load
    )

Шаг 6: Определение порядка выполнения задач

Установите последовательность выполнения задач:

    extract_task >> transform_task >> load_task

Таким образом, задачи будут выполнены последовательно: сначала extract_task, затем transform_task, и в конце load_task.

Советы по организации Airflow-проектов

  • Используйте шаблоны и переменные для переиспользования DAG'ов: создавайте переменные Airflow для параметров, используйте Jinja-шаблоны в SQL-запросах и Bash-командах. Это сделает ваши DAG’и универсальными и легко настраиваемыми.

  • Настройте мониторинг и логирование: включите сбор логов задач, настройте метрики выполнения DAG’ов и интегрируйте систему алертов (например, Slack, email), чтобы оперативно реагировать на проблемы.

  • Оптимизируйте DAG'и для параллельного выполнения: избегайте последовательных длинных цепочек задач, создавайте небольшие независимые задачи, которые можно запускать параллельно для ускорения выполнения.

Типичные ошибки и как их избежать

  • Разбивайте задачи на мелкие и параллельные: избегайте больших монолитных задач, так как при их падении придется запускать весь процесс заново. Разделяйте задачи на меньшие и независимые компоненты.

  • Внедряйте систему уведомлений и мониторинга: при возникновении ошибок вы должны немедленно получать уведомления, чтобы быстро реагировать и исправлять проблему.

  • Используйте секретные хранилища вместо хранения данных в коде: не храните пароли, ключи API и другие конфиденциальные данные в открытом виде. Используйте переменные Airflow, секретные менеджеры AWS или HashiCorp Vault.

Как масштабировать Airflow

  • Используйте CeleryExecutor для распределенного выполнения: CeleryExecutor позволяет выполнять задачи на нескольких узлах одновременно, значительно улучшая производительность и надежность вашей инфраструктуры.

  • Рассмотрите KubernetesExecutor для управления задачами в Kubernetes: KubernetesExecutor идеально подходит для автоматического масштабирования задач. Он запускает каждую задачу как отдельный под в кластере Kubernetes, позволяя легко управлять ресурсами и обеспечивать высокую доступность.

Заключение

Apache Airflow — мощный инструмент для оркестрации и автоматизации рабочих процессов, существенно облегчающий жизнь инженерам данных. Правильная настройка и использование Airflow позволяют построить надежные и прозрачные ETL-процессы, обеспечивающие качественную аналитику и принятие решений на основе данных.

Комментарии (0)