Что делает инженер, если DAG не выполняется? Проверяет Airflow 50 раз, а потом вспоминает, что забыл поставить @dag над функцией.

Развертывание Apache Airflow в промышленной среде — это сложная задача, требующая учета множества аспектов: от обеспечения безопасности конфиденциальных данных до эффективного управления ресурсами. Одной из ключевых проблем, с которыми сталкиваются команды, является безопасное управление секретами, оптимизация конфигураций и наблюдаемость.

В этой статье мы рассмотрим, как использовать инструменты, такие как Sops и YAML-якоря, для упрощения управления конфиденциальными данными и улучшения читаемости конфигураций. А так же как обеспечить полную наблюдаемость инсталляции Apache Airflow

Коллеги, здарова!

 

Мне часто стали попадаться новые статьи о том, как деплоят Apache Airflow разные ребята у себя в организациях. Кто-то использует Docker Compose, кто-то показывает, как запустить это приложение на выделенных нодах, кто-то даже компилирует для этого свою необходимую версию Python (хотя, наверное, статья от 2022 года уже немножко устарела).

Сегодня я хочу вам рассказать о том, как мы деплоем промышленную версию Apache Airflow в Kubernetes.

Что мы сейчас рассмотрим:

1. Соберем свой Helm Chart который будет включать все необходимые компоненты.
2. Покроем Airflow мониторингом и логированием.
3. Отдельно поговорим про логирование, есть парочка интересных моментов.
4. И настроим правильное выделение ресурсов, для запуска подов.

Маленькая правочка от меня по поводу русификации:
Секрет = какая-нибудь тайна, которую лучше не рассказывать жене или любовнице :) .
Сикрет = Это поле в манифесте K8s, которое указывает тип ресурса Secret
Я так и не смог определится с написанием терминов на русском или английском языке. По этому вы увидите в тексте о том, как я пишу что-то на английском и тоже самое на русском языке.

Перед началом, я вам расскажу про способ шифрования Secret в k8s. Ведь обычно когда мы создаем манифест с kind: Secret , то все частные данные мы указываем в открытом виде, что не есть хорошо, особенно есть это потом попадет в какой-нибудь репозиторий Git.

Мои коллеги своевременно рассказали мне про такой инструмент, как Sops https://github.com/getsops/sops

Основные преимущества Sops:

  • Шифрование на уровне файлов.

  • Интеграция с популярными инструментами, такими как Helm и Terraform.

  • Поддержка различных провайдеров ключей (AWS KMS, GCP KMS, Azure Key Vault).

Пример использования Sops

Создайте файл secrets.yaml, содержащий необходимые секреты:

secrets:
  mysql-root-password: ENC[AES256_GCM,data:2nLud5nDZEmm/sCW,...]
  mysql-password: ENC[AES256_GCM,data:e4m5nwmy+zknWtWJ,...]
  airflow-user-username: ENC[AES256_GCM,data:8RiizFAMKh+9LWM=,...]
  airflow-user-password: ENC[AES256_GCM,data:lkjasd98adf7...]

Все значения зашифрованы и будут расшифрованы автоматически при деплое. Для шифрования используйте команду sops -e secrets.yaml.

Теперь в файле values.yaml подключите секреты:
secrets: {}

Большим плюсом являет то, что он интегрирован с Helm и позволяет деплоить приложения с зашифрованными сикретами и хранить из в вашей системе контроля версий, если вы по какой-то причине не используете другие популярные решения.

Давайте начинать

Мы будем собирать свой чарт на основе  следующего чарта:
https://github.com/airflow-helm/charts

Это чарт которые поддерживается комьюнити и был выбран мной, потому что, как мне показалось, он более наглядный и исчерпывающий, чем оригинальный чарт Apache Airflow.

apiVersion: v2
name: airflow
description: A Helm chart for Kubernetes with Apache Airflow
type: application
sources:
  - https://github.com/airflow-helm/charts/tree/main/charts/airflow
version: 8.8.0
dependencies:
  - condition: mysql.enabled
    name: mysql
    repository: bitnamicharts
    version: 9.14.1
  - condition: prometheus-statsd-exporter.enable
    name: prometheus-statsd-exporter
    repository: https://prometheus-community.github.io/helm-charts
    version: 0.10.1
  - condition: airflow.enable
    name: airflow
    repository: https://airflow-helm.github.io/charts
    version: 8.8.0
  - condition: fluent-bit.enabled
    name: fluent-bit
    repository: https://fluent.github.io/helm-charts
    version: 0.39.0
  - condition: minio.enable
    name: minio
    repository: bitnamicharts
    version: 12.8.17

Для начала подключим необходимые чарты и сделаем helm upgrade

Давайте соберем для начала чарт, в котором будет только Airflow и дальше по ходу рассказа будем подключать остальные библиотеки. В конце статьи я прикреплю полностью готовый, который можно будет скопировать и запустить.

airflow:
  airflow:
    legacyCommands: false
    image:
      repository: apache/airflow
      tag: v2.7.1

Так как мы используем Kubernetes, то нам нужно выбрать Executor для выполнения. Kubernetes Executor позволяет выполнять таски внутри дагов в отдельном поде, который будет создаваться под каждую таску.

executor: KubernetesExecutor

Далее разберемся с конфигами и переменными. Мы настраиваем следующие параметры. Детально остановимся только на нескольких.

config:
      AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
      AIRFLOW__CORE__LOAD_EXAMPLES: "False"
      AIRFLOW__METRICS__STATSD_ON: "True"
      AIRFLOW__METRICS__STATSD_HOST: airflow-cluster-prometheus-statsd-exporter
      AIRFLOW__METRICS__STATSD_PORT: "9125"
      AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
      AIRFLOW__LOGGING__REMOTE_LOGGING: True
      AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "minio_S3"
      AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://airflow-logs/
      AIRFLOW__WEBSERVER__BASE_URL: "https://airflow.hramoff.local"
      AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS: True

Указываем адрес, по которому будет обслуживаться Airflow.

AIRFLOW__WEBSERVER__BASE_URL: "https://airflow.hramoff.local"

Данная переменная позволит нам не загружать Даги, которые разработчики оставили нам для примера.

AIRFLOW__CORE__LOAD_EXAMPLES: "False"

Данная переменная позволит нам скрывать вс. Конфиденциальную информацию звездочками, которую мы будем задавать далее, данная настройка относится к веб-интерфейсу.

AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"

С остальными переменными мы будем разбираться по ходу действия.

Далее у нас есть такая сущность как KubernetesPodTemplate, которую мы настроим дальше. Нам в ней, в первую очереди интересуют ресурсы, выдаваемы подам. Каким конкретно подам? А тем подам, внутри которых идет выполнения дагов и их тасок, то есть, по задумке, эти значения не применяются к подам, которые отвечают за веб-интерфейс, планировщик и так далее. Мы пришли к выводу, что эти поды нам лучше запускать на выделенных нодах, поэтому укажем еще селекторы  и tolerations.

Важное уточнение. Как вы знаете Apache Airflow написан на Python. А рабочая среда для выполнения дагов однопоточная. Это значит, что мы не можем использовать более одного ядра по умолчанию, если конечно написанный вами даг, этого не поддерживает, но такое решение будет не правильным, в контексте Airflow. Поэтому в лимитах мы максимум указывает cpu: 1, так как больше мы использовать не можем и резервировать выше этого лимита не имеет смысла.

kubernetesPodTemplate:
      resources:
        requests:
          memory: 512Mi
          cpu: 100m
        limits:
          memory: 16Gi
          cpu: 1
      nodeSelector:
        node-role/airflowKubernetesExecutor: ""
      tolerations:
        - key: node-role/airflowKubernetesExecutor
          operator: Exists

Дальше у нас идет компонент scheduler. Это планировщик, который отслеживает все таски и даги, так же отвечает за выполнение и состояние.

scheduler:
    replicas: 1
    resources:
      limits:
        memory: 8Gi
        cpu: 2
      requests:
        memory: 8Gi
        cpu: 2
    logCleanup:
      enabled: false
    livenessProbe:
      enabled: true
      initialDelaySeconds: 30

Внутри этого компонента мы так же указываем необходимые для этого ресурсы и пробы. Так же мы настроем LogCleanup для того, чтобы не хранить логи пода.

Дальше у нас идет настройка компонента, который отвечает за веб-интерфейс.

web:
    replicas: 1
    resources:
      limits:
        memory: 8Gi
        cpu: 2
      requests:
        memory: 8Gi
        cpu: 2
    livenessProbe:
      enabled: true
      initialDelaySeconds: 30
    readinessProbe:
      enabled: true
      initialDelaySeconds: 30
    webserverConfig:
      existingSecret: *basic
    service:
      type: ClusterIP
      externalPort: 8080

Этому компоненты мы так же задаем ресурсы и настраиваем пробы.

Здесь вы сможете увидеть такой ключ и значение как existingSecret: *basic

Знатоки Yaml-программирования уже знают, что этот механизм называется якорем, а для остальных я поясню. Если в нашем чарте или любом другом yaml документе есть повторяющиеся разделы, то мы сможем использовать, называемое мной, наследование. Работает это следующим образом: мы можем объявить некоторую структуру yaml и несколько раз ссылаться на нее и использовать значения уже в ней. Что нам это даст? Вместо того, чтобы указывать одинаковые значения везде, мы можем заранее определить нужный нам ключ и наследоваться от него.

YAML-якоря позволяют избежать дублирования конфигураций, упрощая управление и внесение изменений. Рассмотрим пример с использованием секретов.

В начале файла определите ключ:

basic_secret: &basic airflow-secret

А в различных частях конфигурации ссылайтесь на этот якорь:

web:
  webserverConfig:
    existingSecret: *basic

externalDatabase:
  passwordSecret: *basic
  passwordSecretKey: mysql-password

gitSync:
  httpSecret: *basic
  httpSecretUsernameKey: airflow-gitcync-username
  httpSecretPasswordKey: airflow-gitcync-user-token

Преимущества якорей YAML:

  • Централизованное управление повторяющимися параметрами.

  • Легкость в обновлении данных (изменение в одном месте распространяется на весь документ).

  • Упрощение чтения и поддержки конфигурации.

Далее создадим файл, допустим, secrets.yaml и внутри него будем указывать нужные значения:

secrets:
  airflow-user-username: ENC[AES256_GCM,data:...]
  airflow-user-password: ENC[AES256_GCM,data:...]
  AWS_KEY_ID: ENC[AES256_GCM,data:...]
  AWS_ACCESS_KEY: ENC[AES256_GCM,data:...]

А деплоить мы будем следующим образом:

helm secrets upgrade airflow-cluster . --namespace apache-airflow --install -f values.yaml -f secrets.yaml

Таким образом у нас автоматически будут подставляться и расшифровываться сикреты и другая конфиденциальная информация.

Далее у нас есть такой компонент как workers. Мы его авансом отключаем, потому что будем использовать KubernetesExecutor. Если мне не изменяет память, то данный компонент будет создавать заранее работающие поды, на которых будут выполнятся необходимые нам даги.

  workers:
    enabled: false

Есть еще компонент с триггерами. В Apache Airflow компонент Trigger используется для определения условий, при которых определенные таски могут быть выполнены. Это позволяет создавать более сложные и динамичные рабочие процессы, основанные на событиях или условиях, а не только на временных интервалах или последовательности выполнения.

triggerer:
    enabled: true
    replicas: 1
    resources:
      limits:
        memory: 6Gi
        cpu: 2
      requests:
        memory: 6Gi
        cpu: 2
    capacity: 1000
    livenessProbe:
      enabled: true
      initialDelaySeconds: 30

У него мы так же задаем ресурсы и пробы.

Есть еще компонент flower, который используется как веб-интерфейс для мониторинга и управления тасками в apache airflow. Его использование остается под вопросом, вроде инструмент хороший, умеет визуализировать много полезной информации, но пару раз натыкались в нем на баги, да и какой-то особо важной информации он нам не дает, по этому решили его отключить, но я вам приложу фотокарточку из статьи с medium, чтобы было примерно понятно, что это за компонент такой.

Веб-интерфейс Flower
Веб-интерфейс Flower
  flower:
    enabled: false

Далее у нас идет настройка логов. Мы указываем путь к ним и отключаем хранение.

logs:
    path: /opt/airflow/logs
    persistence:
      enabled: false

Далее конфигурируем среду выполнения дагов.

dags:
    path: /opt/airflow/dags
    persistence:
      enabled: false

    gitSync:
      enabled: true
      image:
        repository: /git-sync/git-sync
      repo: http://git.hramoff.local/airflow/airflow.git
      branch: main
      syncWait: 30
      httpSecret: *basic
      httpSecretUsernameKey: airflow-gitcync-username
      httpSecretPasswordKey: airflow-gitcync-user-token

Нам нужно указать путь по которому airflow будет искать даги и отключить их хранение. Делаем это мы для того, чтобы у нас работало приложение git-sync, которое будет синхронизировать необходимые нам даги из нашего репозитория.

У него мы указываем образ, который будет использоваться, а так же параметры для его работы, например интервал, ветку из которой будем клонировать и реквизиты пользователя.

Далее самым стандартным образом настраиваем ингрес.

ingress:
    enabled: true
    web:
      enabled: true
      host: airflow.hramoff.local
      ingressClassName: nginx

Далее включим Service Account.

serviceAccount:
    create: true
    name: ""
    annotations: { }

По умолчанию Apache Airflow использует PostgreSQL, но мы можем задать другую базу данных для хранения. Я использую MySQL потому что он мне роднее и более знаком :).

pgbouncer:
    enabled: false

  postgresql:
    enabled: false

  externalDatabase:
    type: mysql
    host: airflow-cluster-mysql
    port: 3306
    database: airflow
    user: airflow
    passwordSecret: *basic
    passwordSecretKey: mysql-password
    properties: ""

И отключаем использование Redis, как брокера сообщений, потому что мы работаем внутри Kubernetes и KubernetesExecutor закрывает потребность в дополнительном брокере.

redis:
    enabled: false

Далее настроим MySQL.

mysql:
  image:
    registry: ""
    repository: bitnami/mysql
  enabled: true
  auth:
    username: airflow
    database: airflow
  existingSecret: *basic
  defaultAuthPlugin: caching_sha2_password

  primary:
    persistence:
      storageClass: "vsphere-csi-sc"
      size: 50Gi

  secondary:
    replicaCount: 1
    persistence:
      storageClass: "vsphere-csi-sc"
      size: 50Gi

Используем отказоустойчивую конфигурацию mysql с репликой.

Далее мы включим ServiceMonitor для того чтобы забирать метрики от Apache Airflow.

serviceMonitor:
  enabled: true
  interval: 25s
  path: /metrics
  port: http
  selector:
    component: prometheus-statsd-exporter

А загружать мы будем в Prometheus StatsD exporter.

prometheus-statsd-exporter:
  image:
    repository: prom/statsd-exporter
  enable: true
  statsd:
    mappingConfigMapName: airflow-cluster-statsd
    mappingConfigMapKey: mappingConf

Каким образом это работает. Для начала в документации apache airflow есть вся необходимая информация.

https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html

Теперь давайте вернемся к инструкции повыше и включим метрики в apache airflow.

AIRFLOW__METRICS__STATSD_ON: "True"
AIRFLOW__METRICS__STATSD_HOST: airflow-cluster-prometheus-statsd-exporter
AIRFLOW__METRICS__STATSD_PORT: "9125"

Использовать для этого мы будем следующую конфигурацию:
https://github.com/data-burst/airflow-monitoring-and-alerting

В ней ней нам нужно использовать маппинг для StatsD. Для этого мы возьмем соответствующий код и положим его в ConfigMap.

Выглядеть это будет так

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ include "airflow.fullname" . }}-statsd
data:
  mappingConf: |
    mappings:
    # Airflow StatsD metrics mappings (https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html)
    # === Counters ===
    - match: "(.+)\\.(.+)_start$"
      match_metric_type: counter
      name: "af_agg_job_start"
      match_type: regex
      labels:
        airflow_id: "$1"
        job_name: "$2"

это не полный пример конфига

Думаю не стоит говорить о том, что ваш сервис монитор должен быть натравлен на Prometheus, а как загрузить дашборы в Grafana вы и так знаете. Вот ссылка на Dashboards - https://github.com/data-burst/airflow-monitoring-and-alerting/tree/main/config_files/grafana/var/lib/grafana/dashboards

Далее настроим логирование подов, в которых выполняются даги.
Использовать для этого будем FluentBit со следующим конфигом.

fluent-bit:
  image:
    repository: fluent/fluent-bit
    tag: 2.1.10
  extraVolumes:
    - name: statedir
      hostPath:
        path: /opt/data/airflow/fluentbit
  extraVolumeMounts:
    - name: statedir
      mountPath: /opt/data/

  nodeSelector:
    node-role/airflowKubernetesExecutor: ""
  tolerations:
    - key: node-role/airflowKubernetesExecutor
      operator: Exists

  config:
    inputs: |
      [INPUT]
          Name tail
          Path /var/log/containers/*_apache-airflow_base*.log
          multiline.parser docker, cri
          Tag kube.*
          Mem_Buf_Limit 5MB
          Skip_Long_Lines On
          Refresh_Interval    1
          read_from_head  true
          DB /opt/data/airflow_state.db

    filters: |
      [FILTER]
          Name grep
          Match kube.*
          Exclude log /.*MYSQL_OPT_RECONNECT.*/
          Exclude log /\s200\s\d.*kube-probe/
          Exclude log /input:tail:tail/

      [FILTER]
          Name kubernetes
          Match kube.*
          Merge_Log On
          Keep_Log Off
          K8S-Logging.Parser On
          K8S-Logging.Exclude On

      [FILTER]
          Name nest
          Match kube.*
          Operation lift
          Nested_under kubernetes
          Add_prefix k8s:
      
      [FILTER]
          Name nest
          Match kube.*
          Operation lift
          Nested_under k8s:labels
          Add_prefix k8s:labels:
      
      [FILTER]
          Name nest
          Match kube.*
          Operation lift
          Nested_under k8s:annotations
          Add_prefix k8s:annotations:
      
      [FILTER]
          Name           record_modifier
          Match          kube.*
          Allowlist_key  k8s:annotations:dag_id
          Allowlist_key  k8s:labels:task_id
          Allowlist_key  log
          Allowlist_key  stream
          Allowlist_key  message

    outputs: |
      [OUTPUT]
          Name                    gelf
          Match                   kube.*
          Host                    graylog.hramoff.local
          Port                    12205
          Mode                    UDP
          Gelf_Short_Message_Key  log

Давайте разберем что мы тут видим. Начнем с секции Input. Мы будем читать файлы с логами контейнера, которые запускаются. Нас интересуют файлы по такому пути и с такой маской /var/log/containers/*_apache-airflow_base*.log

Далее рассмотрим фильтры. Во первых мы избавляемся от логов в которых есть MYSQL_OPT_RECONNECT

Хоть мы и используем последнюю версию, но в логах постоянно есть такие сообщения о несовместимости.

Логи пода с веб-интерфейсом Apache Airflow
Логи пода с веб-интерфейсом Apache Airflow

Не понятно еще, когда уже исправят такую штучку

А так же избавимся от логов, связанных с пробами Kubernetes.
Рассмотрим следующие фильтры. По умолчанию поля с labels и annotations имеют очень не удобный для чтения вид. Всеми остальными фильтрами мы приводим в читаемый вид логи, чтобы можно было удобно отфильтровать записи по названию DAG или таски которая там выполняется.

Так выглядит лог в Graylog
Так выглядит лог в Graylog

Таким образом мы получим сообщение, в котором удобно производить поиск по полям: название дага и название таски.

Ну и наконец нам потребуется Minio.

minio:
  enabled: true

  persistence:
    enabled: true
    existingClaim: "minio"

  podAnnotations:
    prometheus.io/scrape: "true"
    prometheus.io/path: "/minio/v2/metrics/cluster"
    prometheus.io/port: "9000"

  metrics:
    serviceMonitor:
      enabled: true
      interval: 25s
      paths:
        - /minio/v2/metrics/cluster
        - /minio/v2/metrics/node

  resources:
    limits:
      cpu: 1
      memory: 8Gi
    requests:
      cpu: 300m
      memory: 512Mi

  ingress:
      enabled: true
      ingressClassName: "nginx"
      hostname: airflow-minio.hramoff.local

Зачем в данной конфигурации нам потребуется минио? А для того чтобы хранить логи. Но не те логи о которых мы говорили выше.

Ранее мы хранили логи пода, в котором запускался Airflow. А теперь мы храним логи выполнения Дага.

Лог выполнения таски
Лог выполнения таски

Ознакомится с документацией вы можете здесь.
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/s3-task-handler.html

Так как мы не храним все логи в Persistence Volume, то нам нужно какое-то отдельное хранилище. Airflow умеет класть логи в Amazon S3.
Для этого мы еще раз вернемся к переменным окружения и настроем хранилище логов.

AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__LOGGING__REMOTE_LOGGING: True
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "minio_S3"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://airflow-logs/

Подключаться мы будем к коннекшену minio_s3, которое создадим далее. И класть логи мы будем в отдельный бакет.

Теперь я вам покажу как создавать коннекшены в Airflow, при условии, что некоторые переменные являются конфиденциальными и зашифрованными.

Мы можем использовать переменную connections

connections:
      - id: minio_S3
        type: aws
        description: Minio_S3_connection
        extra: |-
          { "aws_access_key_id": "${AWS_KEY_ID}",
          "aws_secret_access_key": "${AWS_ACCESS_KEY}",
          "endpoint_url": "http://airflow-cluster-minio:9000" }

И задать следующие значения. Как вы видите ключи для подключения в основном чарте у нас скрыты за переменную.

Для того чтобы задать им конкретное значение будем использовать ConnectionsTemplates

connectionsTemplates:
  AWS_KEY_ID:
    kind: secret
    name: *basic
    key: AWS_KEY_ID
  AWS_ACCESS_KEY:
    kind: secret
    name: *basic
    key: AWS_ACCESS_KEY

Здесь мы объявляем название эти переменных, указываем им тип как сикрет и опять встречаем якоря.

Для того чтобы задать значения этим ключам мы переходим в отдельный файл с сикретами и указываем таком образом.

secrets:
  mysql-root-password: mysql-root-password
  mysql-password: mysql-password
  airflowUser-username: airflowUser-username
  airflowUser-password: airflowUser-password
  airflow-gitcync-username: airflow-git-user
  airflow-gitcync-user-token: airflow-git-user-token
  webserver_config.py: sample-webserver-config
  AWS_KEY_ID: AWS_KEY_ID
  AWS_ACCESS_KEY: AWS_ACCESS_KEY

После этого мы можем задеплоить Apache Airflow

helm secrets upgrade airflow-cluster . --namespace apache-airflow --install -f values.yaml -f secrets.yaml

Что мы ожидаем увидеть.
1. Работающий Apache airflow

Apache Airflow
Apache Airflow

2. Мониторинг всех компонентов внутри Grafana.

Cluster Dashboard
Cluster Dashboard
Dag Dashboard
Dag Dashboard
MinIO Dashboard
MinIO Dashboard

3. Логирование подов.

Логи пода внутри GrayLog
Логи пода внутри GrayLog

4. Хранение логов в Minio

Хранение логов в MinIO
Хранение логов в MinIO

В заключение, успешное развертывание Apache Airflow в Kubernetes требует комплексного подхода, охватывающего все аспекты observability, управления ресурсами и безопасного обращения с данными. Применяя рассмотренные методы и практики, мы можем значительно повысить надежность и эффективность наших ETL-процессов, а также обеспечить безопасность и управляемость нашей инфраструктуры. Надеемся, что представленные решения будут полезны для вашей команды и помогут вам в реализации проектов на базе Apache Airflow в Kubernetes.

Прикладываю готовый helm чарт, который у нас получился.

Скрытый текст
existingSecret: &basic airflow-secret

airflow:
  airflow:
    legacyCommands: false
    image:
      repository: apache/airflow

    executor: KubernetesExecutor

    config:
      AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
      AIRFLOW__CORE__LOAD_EXAMPLES: "False"
      AIRFLOW__METRICS__STATSD_ON: "True"
      AIRFLOW__METRICS__STATSD_HOST: airflow-cluster-prometheus-statsd-exporter
      AIRFLOW__METRICS__STATSD_PORT: "9125"
      AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
      AIRFLOW__LOGGING__REMOTE_LOGGING: True
      AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "minio_S3"
      AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://airflow-logs/
      AIRFLOW__WEBSERVER__BASE_URL: "https://airflow.hramoff.local"
      AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS: True

    users: []

    connections:
      - id: minio_S3
        type: aws
        description: Minio_S3_connection
        extra: |-
          { "aws_access_key_id": "${AWS_KEY_ID}",
          "aws_secret_access_key": "${AWS_ACCESS_KEY}",
          "endpoint_url": "http://airflow-cluster-minio:9000" }


    connectionsTemplates:
      AWS_KEY_ID:
        kind: secret
        name: *basic
        key: AWS_KEY_ID
      AWS_ACCESS_KEY:
        kind: secret
        name: *basic
        key: AWS_ACCESS_KEY
      
    extraVolumeMounts: [ ]

    extraVolumes: [ ]

    kubernetesPodTemplate:
      resources:
        requests:
          memory: 512Mi
          cpu: 100m
        limits:
          memory: 16Gi
          cpu: 1
      nodeSelector:
        node-role/airflowKubernetesExecutor: ""
      tolerations:
        - key: node-role/airflowKubernetesExecutor
          operator: Exists

  scheduler:
    replicas: 1
    resources:
      limits:
        memory: 8Gi
        cpu: 2
      requests:
        memory: 8Gi
        cpu: 2
    logCleanup:
      enabled: false
      retentionMinutes: 21600
    livenessProbe:
      enabled: true
      initialDelaySeconds: 30

  web:
    replicas: 1
    resources:
      limits:
        memory: 8Gi
        cpu: 2
      requests:
        memory: 8Gi
        cpu: 2
    livenessProbe:
      enabled: true
      initialDelaySeconds: 30
    readinessProbe:
      enabled: true
      initialDelaySeconds: 30
    webserverConfig:
      existingSecret: *basic
    service:
      type: ClusterIP
      externalPort: 8080

  workers:
    enabled: false

  triggerer:
    enabled: true
    replicas: 1
    resources:
      limits:
        memory: 6Gi
        cpu: 2
      requests:
        memory: 6Gi
        cpu: 2
    capacity: 1000
    livenessProbe:
      enabled: true
      initialDelaySeconds: 30

  flower:
    enabled: false

  logs:
    path: /opt/airflow/logs
    persistence:
      enabled: false

  dags:
    path: /opt/airflow/dags
    persistence:
      enabled: false

    gitSync:
      enabled: true
      image:
        repository: git-sync/git-sync
      repo: http://git.hramoff.local/airflow/airflow.git
      branch: main
      syncWait: 30
      httpSecret: *basic
      httpSecretUsernameKey: airflow-gitcync-username
      httpSecretPasswordKey: airflow-gitcync-user-token

  ingress:
    enabled: true
    web:
      enabled: true
      host: airflow.hramoff.local
      ingressClassName: nginx

  serviceAccount:
    create: true
    name: ""
    annotations: { }

  extraManifests: [ ]

  pgbouncer:
    enabled: false

  postgresql:
    enabled: false

  externalDatabase:
    type: mysql
    host: airflow-cluster-mysql
    port: 3306
    database: airflow
    user: airflow
    passwordSecret: *basic
    passwordSecretKey: mysql-password
    properties: ""

  redis:
    enabled: false

mysql:
  image:
    registry: ""
    repository: bitnami/mysql
  enabled: true
  auth:
    username: airflow
    database: airflow
  existingSecret: *basic
  defaultAuthPlugin: caching_sha2_password

  primary:
    persistence:
      storageClass: "vsphere-csi-sc"
      size: 50Gi

  secondary:
    replicaCount: 1
    persistence:
      storageClass: "vsphere-csi-sc"
      size: 50Gi

serviceMonitor:
  enabled: true
  interval: 25s
  path: /metrics
  port: http
  selector:
    component: prometheus-statsd-exporter

prometheus-statsd-exporter:
  image:
    repository: prom/statsd-exporter
  enable: true
  statsd:
    mappingConfigMapName: airflow-cluster-statsd
    mappingConfigMapKey: mappingConf

fluent-bit:
  image:
    repository: fluent/fluent-bit
    tag: 2.1.10
  extraVolumes:
    - name: statedir
      hostPath:
        path: /opt/data/airflow/fluentbit
  extraVolumeMounts:
    - name: statedir
      mountPath: /opt/data/

  nodeSelector:
    node-role/airflowKubernetesExecutor: ""
  tolerations:
    - key: node-role/airflowKubernetesExecutor
      operator: Exists

  config:
    inputs: |
      [INPUT]
          Name tail
          Path /var/log/containers/*_apache-airflow_base*.log
          multiline.parser docker, cri
          Tag kube.*
          Mem_Buf_Limit 5MB
          Skip_Long_Lines On
          Refresh_Interval    1
          read_from_head  true
          DB /opt/data/airflow_state.db

    filters: |
      [FILTER]
          Name grep
          Match kube.*
          Exclude log /.*MYSQL_OPT_RECONNECT.*/
          Exclude log /\s200\s\d.*kube-probe/
          Exclude log /input:tail:tail/

      [FILTER]
          Name kubernetes
          Match kube.*
          Merge_Log On
          Keep_Log Off
          K8S-Logging.Parser On
          K8S-Logging.Exclude On

      [FILTER]
          Name nest
          Match kube.*
          Operation lift
          Nested_under kubernetes
          Add_prefix k8s:
      
      [FILTER]
          Name nest
          Match kube.*
          Operation lift
          Nested_under k8s:labels
          Add_prefix k8s:labels:
      
      [FILTER]
          Name nest
          Match kube.*
          Operation lift
          Nested_under k8s:annotations
          Add_prefix k8s:annotations:
      
      [FILTER]
          Name           record_modifier
          Match          kube.*
          Allowlist_key  k8s:annotations:dag_id
          Allowlist_key  k8s:labels:task_id
          Allowlist_key  log
          Allowlist_key  stream
          Allowlist_key  message

    outputs: |
      [OUTPUT]
          Name                    gelf
          Match                   kube.*
          Host                    graylog.hramoff.local
          Port                    12205
          Mode                    UDP
          Gelf_Short_Message_Key  log

minio:
  enabled: true

  persistence:
    enabled: true
    existingClaim: "minio"

  podAnnotations:
    prometheus.io/scrape: "true"
    prometheus.io/path: "/minio/v2/metrics/cluster"
    prometheus.io/port: "9000"

  metrics:
    serviceMonitor:
      enabled: true
      interval: 25s
      paths:
        - /minio/v2/metrics/cluster
        - /minio/v2/metrics/node

  resources:
    limits:
      cpu: 1
      memory: 8Gi
    requests:
      cpu: 300m
      memory: 512Mi

  ingress:
    enabled: true
    ingressClassName: "nginx"
    hostname: airflow-minio.hramoff.local

secrets: { }


Талант — это прежде всего труд

Антон Павлович Чехов русский писатель, драматург, врач 1860–1904

Комментарии (3)


  1. dpvpro
    04.01.2025 23:23

    В заголовке должно быть "деплоим"


    1. Hramoff Автор
      04.01.2025 23:23

      Спасибо!


  1. erley
    04.01.2025 23:23

    Спасибо за хорошую статью!