Приветствую!

На пути инженера данных часто встречаются задачи связанные с DevOps. Одна из таких - развернуть Airflow в Kubernetes кластере. Если до этого похожего опыта работы не было, то эта задача может показаться не тривиальной. Конечно, можно выполнить несколько команд из официального гайда, но если нужно будет что-то поправить, то без понимания, что происходит внутри, обойтись будет сложно. Эта статья призвана  облегчить данную задачу. Она поможет тем, кто уже работал с Airflow, но еще не касался технологии Kubernetes. 

Чтобы легче понять эту тему будем использовать подготовленный репозиторий. Он создан на основе официального, путем упрощения. 

Предварительная работа

Airflow, который развернем, будет иметь следующую архитектуру (рис. 1):

Рис.1 Архитектура Airflow
Рис.1 Архитектура Airflow

Как видите, довольно стандартная схема. Тип executor- Celery, брокер сообщений - Redis, БД - Postgresql. 

Прежде чем перейти к аспектам работы с Kubernetes, необходимо выполнить несколько предварительных работ:

  • Создание кластера. Рекомендую использовать сервис для управления кластером kubernetes в одном из облаков. Лично я использовал Yandex Cloud. Тут находится инструкция по его созданию.

  • Установка kubectl. Утилита, которая позволяет взаимодействовать с кластером.

  • Установка Helm. Ниже этот инструмент будет описан подробнее.

  • Создание БД. Здесь также можно воспользоваться готовым облачным  решением - Managed service for Postgresql или поднять самому.

Helm

Helm используется для установки приложений в Kubernetes кластере. Этот инструмент предоставляет широкий ряд возможностей, но сейчас нас интересует одна из них - шаблонизирование. 

Основная сущность в Helm - chart. Helm chart - это пакет, содержащий шаблоны и информацию, которая необходима для их заполнения. Давайте подробнее посмотрим на содержимое нашего chart (см. репозиторий). Там мы увидим:

  • Chart.yaml - некоторая информация о нашем чарте

  • templates - директория со всеми шаблонами, которые будут развернуты в Kubernetes кластере

  • values.yaml - тут мы определяем переменные, которые будем использовать в шаблонах 

Например, у нас есть значение executor, которое мы хотим использовать. Запишем его в values.yaml

executor: "CeleryExecutor"

Теперь, чтобы использовать эту переменную в шаблоне scheduler, необходимо прописать в соответствующем template файле следующее:

{{ .Values.executor }}

Похоже на jinja template в airflow, не правда ли?

Однако, иногда неудобно просто прописывать все значения в values, хочется их как-то предварительно обработать. Для этого нам поможет _helpers.yaml, который расположен вместе с шаблонами в директории templates. Например, мы хотим объединить значения репозитория и тега для docker image:

{{- define "airflow_image" -}}
  {{- $repository := .Values.images.airflow.repository -}}
  {{- $tag := .Values.images.airflow.tag -}}
    {{- printf "%s:%s" $repository $tag -}}
{{- end }}

Теперь мы можем вызвать эту функцию в шаблоне scheduler:

{{ template "airflow_image" . }}

В представленном репозитории используются также более сложные функции, чтобы понять, что они делают, можно посмотреть на готовый манифест. Чтобы его получить нужно выполнить 

helm template airflow dn-airflow/part1/.

Также я положил результат выполнения этого кода в репозиторий. В статье я буду использовать уже заполненные шаблоны.

Deployment (Scheduler)

Итак, давайте сообщим Kubernetes, что мы от него хотим. Для этого необходимо создать манифесты - yaml файлы, в которых описываем некоторые правила. 

Начнем с манифеста для планировщика : 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow-scheduler
  labels:
    tier: airflow
    component: scheduler
    release: airflow
    chart: "airflow-1"
    executor: CeleryExecutor
spec:
  replicas: 1
  selector:
    matchLabels:
      tier: airflow
      component: scheduler
      release: airflow
  template:
    metadata:
      labels:
        tier: airflow
        component: scheduler
        release: airflow
    spec:
      initContainers:
        - name: wait-for-airflow-migrations
          image: apache/airflow:2.6.2
          imagePullPolicy: IfNotPresent
          volumeMounts:
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
          args:
            - airflow
            - db
            - check-migrations
            - --migration-wait-timeout= 60
          env:
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: airflow-fernet-key
                  key: fernet-key
            …
            - name: AIRFLOW__CORE__LOAD_EXAMPLES
              value: "True"
      containers:
        - name: scheduler
          image: apache/airflow:2.6.2
          imagePullPolicy: IfNotPresent
          args:
            - bash
            - -c
            - exec airflow scheduler
          env:
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: airflow-fernet-key
                  key: fernet-key
            …
            - name: AIRFLOW__CORE__LOAD_EXAMPLES
              value: "True"
          livenessProbe:
            initialDelaySeconds: 10
            timeoutSeconds: 20
            failureThreshold: 5
            periodSeconds: 60
            exec:
              command:
                - sh
                - -c
                - |
                  CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
                  airflow jobs check --job-type SchedulerJob --local
          volumeMounts:
            - name: logs
              mountPath: "/opt/airflow/logs"
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
      volumes:
        - name: config
          configMap:
            name: airflow-airflow-config
        - name: logs
          emptyDir: {}

Первое что нам попадается на глаза, это тип ресурса:
kind: Deployment

Чтобы понять, что такое Deployment, начнем погружение с более простой сущности - Pod. Это несколько контейнеров, которые запускаются вместе на одном узле, с общей сетью и хранилищем. В нашем случае их 2: контейнер с приложением и контейнер, который с приложением, которое ожидает миграций (об этом далее)

Схематично обозначим Pod таким образом (рис. 2) :

Рис.2 Схематичное изображение Pod
Рис.2 Схематичное изображение Pod

Чтобы запустить несколько реплик одного и того же Pod , нам необходим ресурс ReplicaSet. (рис. 3), который следит за тем, чтобы кол-во Pods соответствовало заявленному кол-ву в манифесте.

Рис. 3 Схематичное изображение ReplicaSet
Рис. 3 Схематичное изображение ReplicaSet

Обычно не используют в продакшене отдельно Pod и ReplicaSet. Используют их родительский объект - Deployment (рис. 4). Т.к. Deployment контролирует обновления Pods. Существует 2 стратегии обновления. Одна из них - Rolling Update, позволяет обновлять Pods один за другим, что полезнее в ряде случаях, чем сначала убить все Pods, а затем запустить новые с новой версией приложения.

Рис.4 Схематичное изображение Deployment
Рис.4 Схематичное изображение Deployment

Именно Deployment, который используется для Airflow scheduler, мы будем подробно разбирать:

apiVersion: apps/v1
Задаем версию API, которую будем использовать для нашего ресурса. Чтобы посмотреть поддерживаемые версии для кластера, можно использовать команду: 

kubectl api-versions

metadata 
Тут мы задаем имя Deployment, а также задаем labels. Это опциональная информация о нашем Deployment. Но благодаря им мы можем фильтровать выборку ресурсов. Например, команда

kubectl get pods -n airflow -l component=scheduler

вернет только поды с labels component=scheduler

replicas
Это кол-во реплик, которое будет развернуто

selector
Тут мы описываем каким образом ReplicaSet будет понимать, какими Pods управлять. Мы используем matchLabels где указываем labels. Точно такие же labels мы должны указать в template.

template
Здесь мы описываем шаблон Pod. Также задаем metadata, в которой укажем labels (которые использует selector)

spec
спецификация нашего пода, включает в себя информацию о контейнерах, переменных окружения, подключаемых томах и другую информацию, о которой поговорим ниже:

initContainers
Это контейнер, который будет запускаться перед запуском основного приложения. Мы используем его, чтобы проверить накатились ли уже миграции. Делаем это с помощью команды:

airflow db check-migrations

Описание initContainers похоже на описание Containers, которое рассмотрим подробнее:

containers

  • image: название образа, который будем использовать

  • imagePullPolicy: политика загрузки image. В нашем случае Kubernetes сравнивает digest указанного контейнера и тех, что находятся на машине. Если не совпадают, то скачивает.

  • command и args: задают команду, которую выполняем при запуске контейнера. 

  • env: хранит переменные окружения, которые будут доступны внутри пода, кроме обычных, с названием и значением, сюда можно добавить переменные, значение которых будет приходить из Secret, еще один ресурс в Kubernetes, который обсудим ниже.

livenessProbe
Необходима, если что-то идет не так с приложением запущенным в Pod, то Kubernetes поймет это и перезапустит сломанный Pod.
Внутри содержатся дополнительные метрики:

  • initDelaySeconds: сколько ждать секунд перед запуском первой проверки. Если Pod долго запускается, то стоит поставить более высокое значение, чтобы Kubernetes не убил Pod, прежде, чем приложение запустится.

  • periodSeconds: как часто запускать проверки.

  • timeoutSeconds: время, в течении которого Kubernetes ждет ответа от приложения. Если в течении этого времени приложении не ответило, значит проверка завалилась.

  • failureThreshold: кол-во неудачных попыток, которое произведет Kubernetes , прежде чем убить Pod.

  • exec: описывает способ, которым мы проверяем состояние Pod. В нашем случае запускаем shell скрипт.

volumeMounts
определяет по какому пути подключать тома, которые описаны в volumes

volumes
В нашем случае их 2: config, в который мы монтируем configMap (этот ресурс обсудим ниже). А также logs. Для него создаем том типа emptyDir. Это хранилище будет доступно, пока жив Pod. После рестарта Pod, директория  logs будет пустой.

StatefulSet (Worker)

Для запуска приложений с сохранением состояния, redis и worker в нашем случае, используется StatefulSet

В отличие от Deployment он запускает Pods с предсказуемым именем, например, worker-0, worker-1. Также масштабирование Pods происходит по порядку. Также у StatefulSet другая политика по управлению подключаемыми хранилищ, которую обсудим в следующей части. Причины, по которым для worker использует StatefulSet можно посмотреть здесь.
Шаблон описания почти такой же, как у Deployment, поэтому не будем на нем останавливаться.

ConfigMap (Config)

При описании scheduler мы упоминали ресурс ConfigMap, который монтируем по пути '/opt/airflow/airflow.cfg'. Давайте рассмотрим его.

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-airflow-config
  labels:
    tier: airflow
    component: config
    release: airflow
    chart: "airflow-1"
    heritage: Helm
data:
  airflow.cfg: |-
    [celery]
    worker_concurrency = 16

    [core]
    dags_folder = /opt/airflow/dags
    executor = CeleryExecutor
    load_examples = True

Большинство полей для нас знакомы, кроме data. Тут описываем название и содержимое файла, который будет примонтирован.

Secret (Airflow-fernet-key)

При рассмотрении конструкции env мы встретили такой ресурс, как secret. Рассмотрим его на примере секрета для webserver:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-fernet-key
  labels:
    tier: airflow
    release: airflow
    chart: airflow
    heritage: Helm
  annotations:
    "helm.sh/hook": "pre-install"
    "helm.sh/hook-delete-policy": "before-hook-creation"
    "helm.sh/hook-weight": "0"
type: Opaque
data:
  fernet-key: "TTAxc05IQlBNakZsWVdwclEzSklXbFI2VkRWU01XUjFUM0JZVVV4aFV6ST0="

Описание почти такое же как у ConfigMap, однако есть дополнительное поле type. В Kubernetes есть несколько типов секретов. Мы используем тип Opaque,который содержит произвольные пользовательские данные. В data содержится сам секрет закодированный в формат base64. Создание ключа и кодирование производится с помощью helm.

Также в annotations можно увидеть helm hooks, которые позволяют управлять порядком деплоя приложения. Т.к. нам необходимо, чтобы Secret был создан до того, как будет разворачиваться scheduler, мы используем hook  "pre-install".

Job (Airflow-run-airflow-migrations)

Для scheduler нам необходим Pod, который постоянно работает. Но что, если нам необходимо разово выполнить какое-то действие, например, накатить миграции? Для этого подходит такой Kubernetes ресурс как Job
Помните, что при описании scheduler Deployment мы описывали InitContainer, который проверяет, были ли уже произведена миграция? Сама миграция как раз описана в Job:

apiVersion: batch/v1
kind: Job
metadata:
  name: airflow-run-airflow-migrations
  labels:
    tier: airflow
    component: run-airflow-migrations
    release: airflow
    chart: "airflow-1"
    heritage: Helm
spec:
  template:
    metadata:
      labels:
        tier: airflow
        component: run-airflow-migrations
        release: airflow
    spec:
      restartPolicy: OnFailure
      containers:
        - name: run-airflow-migrations
          image: apache/airflow:2.6.2
          imagePullPolicy: IfNotPresent
          args:
            - bash
            - -c
            - |-
              exec \
              airflow db upgrade
          env:
            - name: PYTHONUNBUFFERED
              value: "1"
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: airflow-fernet-key
                  key: fernet-key
            …
            - name: AIRFLOW__CORE__LOAD_EXAMPLES
              value: "True"
          volumeMounts:
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
      volumes:
        - name: config
          configMap:
            name: airflow-airflow-config

Новое поле для нас это restartPolicy, значение OnFailure говорит о том, что Job будет перезапускаться в случае неудачи.

Service (Airflow-webserver)

Осталось рассмотреть такой ресурс как Service.
Этот ресурс необходим для предоставления доступа к нашему приложению. На данный момент мы будем использовать его, чтобы у нас был доступ к UI Airflow из сети. Это не продовое решение, но для того, чтобы убедится, что все работает корректно, в самый раз:

apiVersion: v1
kind: Service
metadata:
  name: airflow-webserver
  labels:
    tier: airflow
    component: webserver
    release: airflow
    chart: "airflow-1"
    heritage: Helm
spec:
  type: NodePort
  selector:
    tier: airflow
    component: webserver
    release: airflow
  ports:
    - name: airflow-ui
      port: 8080

Существует несколько типов сервисов. Мы используем NodePort, который предоставит доступ к Pod по ip адресу узла, на котором крутится Pod. Чтобы узнать этот ip можно зайти в консоль Yandex Cloud и посмотреть его.

Также необходимо указать порт, на котором крутится webserver, он должен быть такой же, как в настройках самого Airflow.

Namespace

Перед деплоем приложения нам нужно разобрать еще один ресурс - Namespace. Он позволяет разбить кластер на логические группы. Мы будем деплоить все компоненты Airflow в одном Namespace. Также на Namespace можно наложить различные политики доступа.

Проверяем, что все работает

Отлично, мы описали все наши ресурсы, давайте взглянем на схему (рис. 5), как это все теперь выглядит:

Рис. 5 Изображение ресурсов Kubernetes используемых Airflow
Рис. 5 Изображение ресурсов Kubernetes используемых Airflow

Давайте теперь попробуем запустить наш chart.
Сначала клонируем репозиторий:

git clone git@github.com:Siplatov/dn-airflow.git

Далее необходимо заменить значения host, user и pass для postgresql на свои. Эти значения находятся в values.yaml.
После этого можно делать релиз:

helm upgrade --install -n airflow --create-namespace airflow dn-airflow/part1.

Через несколько минут можно проверить, запустились ли уже pods (Рис. 6):

kubectl get pods -n airflow
Рис. 6 Проверка работоспособности Pods
Рис. 6 Проверка работоспособности Pods

Видим, что Jobs успешно отработали, а все остальные Pods в статусе running. Значит все хорошо. Однако видно, что webserver перезапускался несколько раз. Чтобы понять почему, необходимо выполнить команду:

kubectl describe pod airflow-webserver-dc54c9884-g7x5s -n airflow

В конце увидим следующую картину (рис. 7):

Рис. 7 Вывод команды describe pod
Рис. 7 Вывод команды describe pod

Webserver не успел запуститься за время указанное в initDelaySeconds livenessProbe и Kubernetes сделал несколько проверок, посчитал, что Webserver мертв и решил его перезапустить. Поэтому стоит увеличить initDelaySeconds.

Чтобы проверить UI airflow, давайте посмотрим на services (рис. 8):

kubectl get services -n airflow
Рис. 8 Вывод команды get services
Рис. 8 Вывод команды get services

Видим, что проверять нужно на порту 32462

Если при открытии ссылки увидим панель с регистрацией (рис. 9), значит все запустилось успешно:

Рис. 9 Окно с авторизацией Airflow
Рис. 9 Окно с авторизацией Airflow

Логин и пароль мы указали в values.yaml, в пункте defaultUser.

Заключение

Мы рассмотрели минимальный набор сущностей, необходимый для запуска Airflow. Однако некоторые моменты требуют доработки: секреты лежат в yaml файле, логи будут удаляться после передеплоя, нет синхронизации с репозиторием с DAG, доступ не по https  и т.д. Как это исправить, обсудим в следующей части.

Комментарии (2)


  1. rebel_web
    22.10.2023 13:11

    Сейчас все обсуждают как съехать с Airflow на Prefect или Dagster. Статья интересная если учишься на data engineer но надо и на более современные вещи смотреть.


    1. Taragolis
      22.10.2023 13:11

      Прям все-все-все-все? Просто статистика скачиваний говорит немного о другом:

      Ни в коем случае это не умоляет другие решения отличные от Airflow, но не стоит выдавать желаемое за действительное.