В этом туториале мы пошагово разберем, как создать с нуля и запустить локально свой первый пайплайн на Airflow.
Данный пайплайн специально адаптирован под задачи машинного обучения. В этом примере мы будем загружать новости из открытого источника и использовать NLP-модель для их классификации (zero-shot classification).
План:
Примеры применения Airflow в проектах с машинным обучением.
Знакомство с Airflow: основные понятия и инструменты.
Написание тасок для загрузки данных и получения предсказания модели.
Запуск Airflow локально через Docker Compose.
Знакомство с веб-интерфейсом Airflow.
Код доступен на GitHub.
Как Airflow используется в проектах с машинным обучением
Airflow хорошо подходит для задач, которые запускаются в указанное время или каждый заданный интервал времени.
Apache Airflow - популярный инструмент в проектах с машинным обучением. Он позволяет создавать эффективные и масштабируемые пайплайны, связанные с обработкой данных, обучением и развертыванием моделей.
Примеры задач, которые решает Airflow:
Регулярное переобучение моделей машинного обучения на новых данных.
Получение предсказаний модели в пакетном режиме. Например, раз в час или раз в день.
ETL пайплайны, которые загружают данные из разных источников и преобразуют их.
Автоматическая генерация отчетов.
Ниже приведен пример архитектуры рекомендательной системы, где каждая из 3 частей запускается на Airflow:
Feature Engineering: данные от пользователей накапливаются и обрабатываются единоразово в заданное время.
Training Pipeline: на основе актуальных данных модель переобучается, например, раз в день.
Batch Prediction Pipeline: последняя версия модели используется, чтобы рассчитать новые рекомендации по всем пользователям и сохранить в базу.
Шаг 3 в этом пайплайне используется получение предсказания модели в пакетном формате. Когда это имеет смысл?
Преимущество пакетной обработки: нам не надо беспокоиться о latency модели. Работа с моделью в пакетном режиме, как правило, не требует дополнительной оптимизации и позволяет быстрее довести модель до прода.
Недостаток: предсказания имеют запаздывание. Например, рекомендации не будут учитывать real-time фидбек от пользователя.
Если для вашей задачи критична работа модели в real-time формате подойдут другие инструменты. В прошлом туториале мы разбирали, как написать ML веб-сервис на FastAPI, который работает с моделью в формате запрос-ответ.
Знакомство с Airflow: основные понятия и инструменты
Прежде чем начать практическую часть, познакомимся с основными понятиями Apache Airflow.
DAG (Directed Acyclic Graph) - удобный способ организации и визуализации пайплайна. Это структура, где каждая задача представлена узлом, а зависимости между задачами представлены направленными стрелками, указывающими порядок выполнения задач.
Граф является направленным, потому что задачи выполняются последовательно в определенном порядке, определяемом зависимостями.Task: "кирпичи", из которых состоит DAG. Задачи представляют собой конкретные шаги или операции, которые должны быть выполнены в вашем пайплайне.
Operator: Каждая задача выполняется некоторым кодом или скриптом, который вы определяете. Операторы (Operators) представляют собой классы или функции, которые определяют, как будет выполнена каждая задача в пайплайне.
Например, есть операторы, отвечающие за выполнение кода на питоне, bash-команд и SQL-запросов.Metadata Database: в этой базе хранятся метаданные о структуре пайплайнов, зависимостях между задачами, расписании выполнения и других параметрах.
Webserver: удобный пользовательский интерфейс (UI), позволяющий запускать, мониторить и отлаживать пайплайны.
Через веб-интерфейс можно просматривать статус выполнения задач, проверять логи, а также управлять пайплайнами, вносить изменения и контролировать их работу.Scheduler: следит за графом пайплайна, определяет, какие задачи уже могут быть выполнены на основе зависимостей и расписания, и запускает их в соответствии с этой логикой. Он также отслеживает выполнение задач и обновляет их статусы в метаданных.
0. Описание задачи и проектирование пайплайна
Главная цель этого туториала - знакомство с Airflow и самостоятельный запуск пайплайна. То есть, по итогу туториала вы научитесь локально запускать Airflow.
Но, для использования Airflow в продакшене необходимы другие инструменты, например, Kubernetes.
Apache Airflow предоставляет возможности для создания сложных пайплайнов с множеством задач и зависимостей между ними. Однако, здесь мы ограничимся простым примером графа пайплайна.
Пайплайн будет состоять из 3 задач:
Task 1: Подготовка данных - загрузка данных, препроцессинг и сохранение в локальную директорию.
Task 2: Предсказание модели - загрузка модели, инференс модели на сохраненных данных.
Task 3: Подготовка отчета на основе предсказаний.
Task 1 и Task 2 мы хотим запустить в отдельных Docker-контейнерах. Для этого будем использовать DockerOperator
.
Зачем может понадобиться запускать таски в отдельных контейнерах?
Это дает стандартные преимущества контейнеризации: изоляция, гибкость и тд. Например, применительно к ML, код расчета фичей и код предсказания модели могли использовать разные версии scikit-learn
.
Также DockerOperator
позволяет ближе познакомиться с тем, как происходит выполнение задач в разных контейнерах с помощью Kubernetes в продакшен-среде.
Task 3 содержит простой код агрегации, поэтому будем использовать PythonOperator
.
В данном примере пайплайна мы выбрали упрощенный подход и не использовали дополнительные инструменты, чтобы не усложнять настройку Airflow:
Для обмена данными между задачами мы использовали локальную директорию
data
. В реальных проектах часто используются специализированные хранилища данных - Amazon S3, DVC и другие.В качестве источника данных выбран открытый источник - финансовые новости с cnbc.com. В реальных задачах это будут данные из внутренних баз данных или тот же S3.
Модель загружали из открытого реестра моделей - Hugging Face Hub. Также сознательно выбрали модель zero-shot classification, которая способна работать с заранее заданным списком классов, не требуя дополнительного дообучения. В реальных проектах это будет специально обученная модель, хранящаяся в реестре моделей, например, MLflow.
Таким образом, в данном примере мы использовали упрощенные варианты для целей наглядности и понимания основных концепций. В реальных проектах нужно настраивать и использовать дополнительные инструменты и хранилища данных.
С учетом вышесказанного код проекта будет выглядеть следующим образом:
data: вспомогательная директория, будет использоваться для обмена данными между тасками и сохранения результатов.
ml_pipeline: содержит код для загрузки и подготовки данных
data_loader
и код предсказания моделиmodel_prediction
. Как уже обсуждалось ранее, каждая таска будет запускаться в отдельном Docker-контейнере, поэтому у обеих таск есть свой Dockerfile.dags: в этой директории находятся файлы, описывающие DAG. Airflow будет использовать эти файлы для планирования и запуска задач в определенном порядке.
docker-compose.yml: определяет конфигурацию контейнеров, необходимых для запуска и работы Airflow.
1. Загрузка данных, используем Docker
Для получения данных о финансовых новостях в этом примере мы будем использовать фид от cnbc.com. Когда мы запрашиваем информацию, он предоставляет нам 30 самых новых новостей из мира финансов. Таким образом, запуская пайплайн раз в день, мы сможем получать каждый день свежие новости.
Простейший код для загрузки данных в формате csv будет выглядеть следующим образом:
import feedparser
import pandas as pd
NEWS_FEED_URL = "https://www.cnbc.com/id/19746125/device/rss/rss.xml"
def data_load(data_path: str) -> None:
news_feed = feedparser.parse(NEWS_FEED_URL)
df = pd.DataFrame(news_feed.entries)
df.to_csv(data_path, sep="\t", index=False)
Помимо этого в финальную версию data_load.py
добавим:
Использование Click и опции командной строки - для возможности гибко задавать параметры при запуске скрипта.
Логирование - для возможности отслеживать работу пайплайнов и обнаружения возможных проблем. Позже эти сообщения мы увидим в интерфейсе Airflow.
Дополнительную обработку данных.
Финальная версия data_load.py
выглядит так:
import html
import logging
import click
import feedparser
import pandas as pd
NEWS_FEED_URL = "https://www.cnbc.com/id/19746125/device/rss/rss.xml"
COLUMNS_TO_SAVE = ["id", "published", "title", "summary"]
logging.basicConfig(level=logging.INFO)
@click.command()
@click.option("--data_path", help="Path to the input data CSV file")
def data_load(data_path: str) -> None:
logging.info("Fetching financial news from the RSS feed...")
news_feed = feedparser.parse(NEWS_FEED_URL)
logging.info("News fetched successfully.")
df = pd.DataFrame(news_feed.entries)[COLUMNS_TO_SAVE]
df["published"] = pd.to_datetime(df["published"])
df["title"] = df["title"].map(html.unescape)
df["summary"] = df["summary"].map(html.unescape)
logging.info(f"Saving the processed data to '{data_path}'...")
df.to_csv(data_path, sep="\t", index=False)
logging.info("Data saved successfully.")
if __name__ == "__main__":
data_load()
Для подготовки Docker-образа осталось создать:
список зависимостей -
requirements.txt
:
feedparser==6.0.10
click==8.1.3
pandas==2.0.1
Dockerfile
:
FROM python:3.11
COPY requirements.txt data_load.py /workdir/
WORKDIR /workdir
RUN pip install -r requirements.txt
Таким образом, у нас готовы все файлы для создания Docker-образа. Мы соберем образ и создадим контейнер позже - в разделе про Docker Compose.
2. Предсказание модели, zero-shot classification
В этом примере будем использовать NLP-модель для задачи zero-shot classification.
Что такое zero-shot classification?
Zero-shot classification (классификация без обучения) - это метод машинного обучения, при котором модель классифицирует объекты на классы, которых не было в процессе обучения модели.
Этот подход позволяет модели генерализовать свои знания и классифицировать объекты на основе общего понимания классов, даже если она не имеет конкретного опыта с каждым классом.
Для начала определим список классов - тем, на которые мы будем разделять финансовые новости:
LABELS = [
"Crypto",
"SEC",
"Dividend",
"Economics",
"Oil or Gas",
"IPO",
"Politics",
"Buffet",
"Stock",
"Other",
]
Для получения предсказаний загрузим модель valhalla/distilbart-mnli-12-1 из Hugging Face Hub. device=-1
означает, что модель запускается на CPU.
from transformers import pipeline
model_hf = pipeline(model="valhalla/distilbart-mnli-12-1", device=-1)
Далее загрузим csv файл с новостями, который мы подготовили в предыдущем пункте. Для предсказания будем использовать объединение текста из заголовка новости (title) и ее краткого описания (summary).
import pandas as pd
df = pd.read_csv(data_path, sep="\t")
texts_for_pred = (df.title + ". " + df.summary).tolist()
Для получения предсказаний передадим модели список текстов texts_for_pred
и классы LABELS
:
pred = model_hf(texts_for_pred, LABELS, multi_label=False)
Флаг multi_label
определяет, может ли объект быть отнесен к одному или более классам. Когда multi_label=True
, модель может присваивать объектам несколько классов одновременно.
Таким образом, мы сами придумали название классов на свое усмотрение. Модель делает предсказание по нашим классам без необходимости предварительного обучения. В этом преимущество zero-shot моделей.
Благодаря библиотеки transformers код занял всего несколько строк.
Выберем предсказание лучшего класса и сохраним результат в json-файл:
df["label"] = [x["labels"][0] for x in pred]
df.T.to_json(pred_path)
На этом код предсказания готов.
Добавим логирование и использование Click по аналогии с кодом загрузки данных. Тогда финальная версия model_predict.py
будет выглядеть следующим образом:
import logging
import click
import pandas as pd
from transformers import pipeline
LABELS = [
"Crypto",
"SEC",
"Dividend",
"Economics",
"Oil or Gas",
"IPO",
"Politics",
"Buffet",
"Stock",
"Other",
]
logging.basicConfig(level=logging.INFO)
@click.command()
@click.option("--data_path", help="Path to the input data CSV file")
@click.option("--pred_path", help="Path to save the output JSON file")
def model_predict(data_path: str, pred_path: str) -> None:
logging.info("Loading the model...")
model_hf = pipeline(model="valhalla/distilbart-mnli-12-1", device=-1)
logging.info("Model loaded successfully.")
logging.info(f"Reading data from '{data_path}'...")
df = pd.read_csv(data_path, sep="\t")
logging.info("Data read successfully.")
texts_for_pred = (df.title + ". " + df.summary).tolist()
logging.info("Performing model prediction...")
pred = model_hf(texts_for_pred, LABELS, multi_label=False)
logging.info("Prediction completed successfully.")
df["label"] = [x["labels"][0] for x in pred]
logging.info(f"Saving the predictions to '{pred_path}'...")
df.T.to_json(pred_path)
logging.info("Predictions saved successfully.")
if __name__ == "__main__":
model_predict()
Код предсказания модели будет также запускаться в отдельном Docker-контейнере. Поэтому аналогично предыдущему пункту мы добавили свои requirements.txt
и Dockerfile
.
Итак, мы подготовили код для 2 компонент нашего пайплайна: загрузки данных и получения предсказания модели. Каждый будет выполняться в отдельном Docker-контейнере. Теперь все готово, чтобы мы перешли к написанию DAG на Airflow.
3. Пишем DAG
Вспомним основные компоненты нашего пайплайна:
3 последовательные таски, первые 2 из которых должны запускаться в отдельных Docker-контейнерах.
Локальная директория
data
, которую мы будем использовать для сохранения результатов и обмена данными между тасками.
Разберем основные шаги при написании нашего DAG:
Локальную директорию
data
нужно примонтировать к/opt/airflow/data/
- это путь к данным внутри контейнера Airflow.
from docker.types import Mount
dockerops_kwargs = {
"mount_tmp_dir": False,
"mounts": [
Mount(
source="<path_to_your_airflow-ml_repo>/data",
target="/opt/airflow/data/",
type="bind",
)
],
...
}
Определим пути для трех типов файлов: исходные данные, предсказания и файл с результатом. Они используют специальный синтаксис Airflow
{{ ds }}
, который будет заменен на дату выполнения при запуске DAG.
raw_data_path = "/opt/airflow/data/raw/data__{{ ds }}.csv"
pred_data_path = "/opt/airflow/data/predict/labels__{{ ds }}.json"
result_data_path = "/opt/airflow/data/predict/result__{{ ds }}.json"
Создадим DAG. Декоратор
dag
создает DAG с названиемfinancial_news
с начальной датой сегодня (days_ago(0)
) и ежедневным запуском. Функцияtaskflow
представляет собой сам DAG и содержит задачи, формирующие пайплайн.
from airflow.decorators import dag
from airflow.utils.dates import days_ago
# Create DAG
@dag("financial_news", start_date=days_ago(0), schedule="@daily", catchup=False)
def taskflow():
...
Создадим две таски для запуска в Docker-контейнерах. Для это будем использовать
DockerOperator
. Здесь мы указываем имя образа (этот образ будет описан вdocker-compose.yml
) и команду для запуска питоновского скрипта внутри контейнера.
# Task 1
news_load = DockerOperator(
task_id="news_load",
container_name="task__news_load",
image="data-loader:latest",
command=f"python data_load.py --data_path {raw_data_path}",
**dockerops_kwargs,
)
# Task 2
news_label = DockerOperator(
task_id="news_label",
container_name="task__news_label",
image="model-prediction:latest",
command=f"python model_predict.py --data_path {raw_data_path} --pred_path {pred_data_path}",
**dockerops_kwargs,
)
Создадим последнюю таску, она преобразует полученные предсказания питоновским кодом. Мы будем использовать
PythonOperator
, который выполнит несложный скрипт группировки предсказаний.
# Task 3
news_by_topic = PythonOperator(
task_id="news_by_topic",
python_callable=aggregate_predictions,
op_kwargs={
"pred_data_path": pred_data_path,
"result_data_path": result_data_path,
},
)
Установим зависимости между задачами. В нашем случае они выполняются последовательно:
news_load >> news_label >> news_by_topic
Наконец, создадим и настроим объект DAG в соответствии с заданными параметрами:
taskflow()
Файл с описанием DAG имеет расширение .py
и лежит в директории dags
. При запуске Airflow, он сканирует эту директорию (или другую настроенную директорию) в поисках файлов с определением DAG. Когда Airflow обнаруживает файл с определением DAG, он регистрирует его и делает доступным для выполнения по расписанию.
Мы закончили писать наш пайплайн, теперь перейдем к настройке и запуску Airflow.
4. Запуск Airflow с помощью Docker Compose
Для локального запуска Airflow мы будем использовать Docker Compose. Он помогает запустить Apache Airflow с минимальными усилиями, предоставляя унифицированный и изолированный способ запуска всех компонентов Airflow.
У Airflow есть отличная инструкция по запуску с помощью Docker Compose. Там же есть загрузка готового файла
docker-compose.yml
. Инструкция позволяет запустить Airflow в пару строк.
Файл docker-compose.yml
имеет раздел services, где определены различные сервисы, которые являются частями кластера Airflow. Каждый сервис имеет свою секцию с настройками, где указывается образ Docker, команда для запуска сервиса, порты, зависимости от других сервисов и другие параметры. В частности здесь описаны сервисы для Metadata Database, Webserver и Scheduler, которые мы упомянали в разделе Знакомство с Airflow.
Модификация docker-compose.yml для запуска тасок в отдельных Docker-контейнерах
Поскольку мы усложнили настройку Airflow, когда решили запускать таски в отдельных контейнерах, будем использовать свой модифицированный docker-compose.yml
. Его можно посмотреть тут.
Основные моменты, которые мы изменили для поддержки запуска тасок в Docker-контейнерах:
Установили пакет для поддержки работы с Docker:
_PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-docker==3.6.0
В список volumes добавили монтирование директории с данными и сокета Docker:
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/data/:/opt/airflow/data/
- /var/run/docker.sock:/var/run/docker.sock
Ранее в директориях
ml_pipeline/data_loader
иml_pipeline/model_prediction
мы написали инструкции по созданию Docker-образов, которые импользуются для запуска тасок с помощью DockerOperator. Здесь мы также определяем эти сервисы:
data-loader:
build:
context: ml_pipeline/data_loader
image: data-loader
restart: "no"
model-prediction:
build:
context: ml_pipeline/model_prediction
image: model-prediction
restart: "no"
Добавим сервис
docker-socket-proxy
:
# Required because of DockerOperator. For secure access and handling permissions.
docker-socket-proxy:
image: tecnativa/docker-socket-proxy:0.1.1
environment:
CONTAINERS: 1
IMAGES: 1
AUTH: 1
POST: 1
privileged: true
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
restart: always
Запуск Airflow
Возможно, в вашем случае запуск потребует выделения больше памяти для Docker Engine.
Перед первым запуском Airflow нужно подготовить окружение.
Если вы работаете на Linux перед запуском нужно указать AIRFLOW_UID
:
echo -e "AIRFLOW_UID=$(id -u)" > .env
Далее независимо от вашей ОС необходимо выполнить миграцию базы данных и создать первую учетную запись пользователя. Для этого выполните команду:
docker compose up airflow-init
Созданная учетная запись имеет логин airflow
и пароль airflow
.
Запуск и остановка Airflow
Для создания и запуска всех необходимых контейнеров, определенных в файле docker-compose.yml
, используется команда:
docker compose up
В нашем случае также необходимо предварительно собрать образы data-loader
и model-prediction
, которые также указаны в файле docker-compose.yml
. Поэтому модифицируем команду:
docker compose up --build
Когда вы закончите работу и захотите очистить свое окружение, выполните:
docker compose down --volumes --rmi all
После запуска Airflow продолжим работу в веб-интерфейсе.
5. Веб-интерфейс Airflow, запуск пайплайна
Пользовательский интерфейс Airflow упрощает мониторинг и запуск пайплайнов. Он доступен по адресу http://localhost:8080. На странице входа нужно ввести логин и пароль от учетной записи, в нашем случае airflow
и airflow
.
Запуск и мониторинг пайплайна
На домашней странице вы увидете список всех дагов, включая дефолтные от Airflow. Здесь можно найти и выбрать созданный нами DAG financial_news
.
На странице нашего DAG доступна разная информация о пайплайне: графическое представление, время ближайшего запуска, логи запуска, код и многое другое.
Не дожидаясь планового запуска пайплайна, запустим DAG, нажав на кнопку старта. Для отслеживания выполнения отдельных тасок, можно нажать на нужную таску и посмотреть ее логи:
Здесь мы можем видеть логи, которые добавили специально для отслеживания выполнения отдельных шагов.
После того, как выполнение пайплайна успешно завершилось, посмотрим на результаты.
Смотрим результат пайплайна. Как классифицировались новости?
Результаты выполнения пайплайна сохранились в data/predict/result__<date>.json
. Изначально мы ставили задачу написать пайплайн, которые будет автоматически загружать актуальные новости из финансового мира и группировать их по заданным нами темам. Посмотрим, что у нас получилось.
Таким образом, мы справились с поставленной задачей. Новости успешно загружаются и классифицируются. Напомним, что темы заданы нами произвольно, без предварительного обучения модели.
Заключение
Мы подробно рассмотрели, как создать с нуля и запустить локально свой первый ML-пайплайн на Airflow.
В этом примере мы написали таски для загрузки новостей из открытого источника и их классификации NLP-моделью (zero-shot classification). Каждая таска выполняется в отдельном Docker-контейнере.
Код доступен на GitHub. Вы легко можете воспроизвести пайплайн и изучить его подробнее.
Если формат туториалов по инструментам MLOps окажется полезным, буду планировать темы для следующих статей. А пока подписывайтесь на мой телеграм-канал. Там будут анонсы новых статей, а также советы для работы и более короткие мысли по DS/ML/AI.