Привет! Меня зовут Алексей Карпов, я DevOps-инженер отдела ML разработки в OKKO. В этой статье я поделюсь опытом в работе Apache Airflow в связке с Kubernetes. Мы соберем собственный образ Docker с python-скриптом, установим Airflow в Kubernetes, настроим автоматическую синхронизацию DAG'ов с удалённым репозиторием и их исполнение. Всё это — на примере запуска простейшего DAGа.

Данная статья будет полезна дата-инженерам и разработчикам, а также DevOps/MLOps-инженерам, которым необходимо автоматизировать запуск скриптов и установить возможность автоматического выполнения по расписанию посредством Kubernetes.

В этой статье вы:

соберёте собственный образ Docker с простейшим python-скриптом

установите утилиту Helm;

установите Airflow с помощью Helm Chart;

научитесь синхронизировать ваши DAG'и с удалённым репозиторием;

запустите DAG, обёрнутый в pod и исполняемый Kubernetes.

Первая часть статьи, в которой описаны основные понятия Airflow и показан процесс
установки на отдельном сервере, находится тут.

Сборка образа Docker

Docker представляет собой полноценную платформу для работы с контейнерами, которая позволяет разрабатывать, доставлять и запускать приложения в контейнерах. Она управляет жизненным циклом контейнеров, автоматически их создает, запускает и развёртывает. Также Docker позволяет запускать множество контейнеров на одной машине. Контейнеризация похожа на виртуализацию, однако это все-таки разные процессы. При виртуализации, на гипервизоре запускается полноценный хост со своим виртуальным оборудованием и операционной системой, что позволяет запускать несколько ОС на одной машине. При контейнеризации контейнеры создаются напрямую из ядра основной ОС. Это позволяет запускать приложения только в основной рабочей ОС.

Преимущество образов Docker состоит в полной изоляции процесса - часто случаются
проблемы с зависимостями python. Использование Docker их полностью решает и
позволяет намного удобнее контролировать процесс.

Чтобы показать один из реальных кейсов использования, соберём для запуска образ docker:

Создадим рабочую директорию и перейдём в неё:

mkdir airflow
cd airflow

Создадим простейший скрипт python , который будем вызывать из DAG'а с именем first-script.py и наполним содержимым:

import datetime
import socket
import os

print ("Hello from kubernetes!!!")
print("datetime now =", datetime.datetime.now())
print("Hostname =", socket.gethostname())
print("Home directory =", os.path.expanduser('~'))

Создадим Dockerfile:

# Задаём имя базового образа, в котором есть интерпретатор Python 
FROM python:3.10.12-alpine
# Назначаем рабочую директорию
WORKDIR /app
# Добавляем скрипт в файловую систему образа
ADD ./first-script.py ./
# Для обеспечения безопасности переключаем пользователя на непривилигированного
USER 1001

Соберём новый образ:

docker build . -t alekseyolg/airflow-with-kubernetes:v1.0

docker - имя программы, должна быть установлена на рабочей машине

build - сборка образа

. - указываем, что собираем в текущей директории

-t alekseyolg/airflow-with-kubernetes:v1.0 - указываем тег образа, при самостоятельной сборке сменить "alekseyolg" на свой ник в Dockerhub

Загрузим образ в Dockerhub:

docker push alekseyolg/airflow-with-kubernetes:v1.0

После загрузки образа его можно запустить в kubernetes.

Что же такое Kubernetes и зачем запускать в нём DAG'и

Kubernetes (k8s) — это система с открытым исходным кодом для автоматизации развертывания, масштабирования и управления контейнерными приложениями. Если не углубляться в подробности - то это кластерная операционная система, которая позволяет большому количеству машин под управлением какой-либо системы контейнеризации (например, всем знакомый Docker) работать как единое целое. Отличительной чертой его является отказоустойчивость - если какая-то его часть откажет то k8s автоматически перераспределит нагрузку на другие узлы, и наши драгоценные DAG'и сумеют выполнить свою работу. При использовании "k8s as service" (kubernetes как сервис, в котором провайдер берёт всё обслуживание кластера на себя), например, SberCloud, то имеется возможность автоматического добавления новых узлов. Это позволит увеличивать вычислительные мощности для кластера при возрастании нагрузки.

Что такое Helm?

Helm - это исполняемый файл, который помогает управлять приложениями Kubernetes с помощью chart`ов (диаграмм). Helm помогает определять, устанавливать и обновлять даже самые сложные приложения Kubernetes.

Чарты легко создавать, верифицировать, делиться ими и публиковать.

Установка Helm

Для начала, подразумеваем, что у нас уже есть свой кластер k8s и доступ к утилите kubectl с правами cluster-admin, а также программа Lens. Установка будет вестись на машине под управлением Linux.

Установим helm:

curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash

Проверим корректность установки:

helm version --short
# Пример вывода
v3.12.0+gc9f554d

Бизнес-схема Airflow

На данной схеме показан принцип работы Airflow:

Data Engineer пишет DAG'и и выгружает их в Github, Gitsync в свою очередь загружает их в каждый POD airflow - web server, scheduler и worker. Web server отвечает за отображение пользовательского интерфейса (UI), Worker запускает DAG'и, а Scheduler отвечает за их распределение на ноды кластера. Хранилищем метаданных выступает база данных PostgreSQL. Настройки Airflow хранятся в файле airflow.cfg.
Data Engineer пишет DAG'и и выгружает их в Github, Gitsync в свою очередь загружает их в каждый POD airflow - web server, scheduler и worker. Web server отвечает за отображение пользовательского интерфейса (UI), Worker запускает DAG'и, а Scheduler отвечает за их распределение на ноды кластера. Хранилищем метаданных выступает база данных PostgreSQL. Настройки Airflow хранятся в файле airflow.cfg.

Установка Airflow

Добавим официальный репозиторий Airflow для Helm:

helm repo add airflow https://airflow.apache.org

Установим Helm Chart:

helm install airflow airflow/airflow \
--debug \
--namespace airflow \
--create-namespace \
--set dags.gitSync.enabled=true \
--set dags.gitSync.repo=https://github.com/alekseyolg/airflow-with-kubernetes.git \
--set dags.gitSync.branch=main \
--set dags.gitSync.subPath="/" 

helm install airflow airflow/airflow - команда установки
--namespace airflow - имя пространства имён
--create-namespace - автоматическое создание пространства имён
--set dags.gitSync.enabled=true - использование синхронизации с удалённым репозиторием GitHub
--set dags.gitSync.repo=https://github.com/alekseyolg/airflow-with-kubernetes.git - репозиторий, с которым необходимо выполнять синхронизацию
--set dags.gitSync.branch=main - имя ветки
--set dags.gitSync.subPath="/" - путь, где лежат даги DAG'и (в данном случае - корень репозитория)

Получим вывод:

Thank you for installing Apache Airflow 2.5.3!

Your release is named airflow.
You can now access your dashboard(s) by executing the following command(s) and visiting the corresponding port at localhost in your browser:

Airflow Webserver:     kubectl port-forward svc/airflow-webserver 8080:8080 --namespace airflow
Default Webserver (Airflow UI) Login credentials:
    username: admin
    password: admin
Default Postgres connection credentials:
    username: postgres
    password: postgres
    port: 5432

You can get Fernet Key value by running the following:

    echo Fernet Key: $(kubectl get secret --namespace airflow airflow-fernet-key -o jsonpath="{.data.fernet-key}" | base64 --decode)

###########################################################
#  WARNING: You should set a static webserver secret key  #
###########################################################

You are using a dynamically generated webserver secret key, which can lead to
unnecessary restarts of your Airflow components.

Information on how to set a static webserver secret key can be found here:
https://airflow.apache.org/docs/helm-chart/stable/production-guide.html#webserver-secret-key

Данный вывод свидетельствует о том, что установка прошла успешно.

Войдём в приложение Lens, откроем пространство имён airflow и проверим, что все сервисы в статусе "Running":

Для того, чтобы открыть UI Airflow, сделаем Port-Forward на локальную машину. Для этого
выберем POD airflow-webserver и нажмём кнопку "Forward...":

Нажмём на кнопку "Start":

После нажатия кнопки автоматически откроется главная страница Airflow.

Войдём в пользователя. Имя - admin, пароль - admin:

Войдём в DAG (нажмём на его имя):

Запустим DAG:

Увидим, что DAG выполнился успешно (выделение зелёным цветом). Посмотри логи, для этого нажмём на зелёный квадрат, а после на кнопку "Logs":

Увидим логи выполнения скрипта, который положили в образ Docker:

Итак, мы собрали собственный образ Docker с простейшим python-скриптом, установили Apache Airflow в Kubernetes и запустили первый DAG.

Комментарии (2)


  1. alecx
    15.06.2023 19:13

    Я честно пытался уследить за мыслю, но так и не понял

    • откуда взялся DAG?

    • в начале сказано, что скрипт запускается DAG'ом, но он упаковывается в контейнер под python:alpine?

    • если контейнер запускает PodOperator (судя по логу), то где лежат pod template и, если нужны, остальные артифакты пода?

    • что с авторизацией airflow -> k8s, gitSync -> repo?


    1. alekseyolg Автор
      15.06.2023 19:13

      1) DAG берётся из репозитория на github, ссылка на репозиторий есть в статье (шаг по установке airflow через helm) (есть на схеме работы Airflow)
      2) Образ запускается DAG'ом, внутри самого образа лежит скрипт
      3) Pod template в нашем случае генерится автоматически оператором, если есть необходимость использовать отдельный темплейт то можно посмотреть примеры в оф. документации Airflow
      4) Авторизация не раскрывалась в статье из-за того, что если это всё расписывать то она станет слишком громоздкой. Airflow использует сервисаккаунт для запуска POD'ов, а gitsync не использует в нашем случае авторизацию, т.к. гитхаб не требует её для публичных репозиториев.