Примерно год назад в Neoflex стартовал большой проект в одной из крупнейших строительных компаний по созданию ЕХД, в которое предполагалось мигрировать 100+ систем‑источников. Для этого мы выбрали Airflow в качестве оркестратора, но с учетом предполагаемого объема выполняемых задач, необходимо было установить кластерную версию, чтобы мы могли распределять нагрузку и при необходимости легко добавлять новые мощности. Очень часто на практике приходится сталкиваться с тем, что отлично задокументированный продукт не «заводится» за пять минут, либо доступен быстро, но в ограниченной конфигурации. Airflow не стал исключением: подробной инструкции фактически нигде не нашлось и, кроме того, мы столкнулись с некоторыми неочевидными вещами. В этой статье поделились свои опытом и деталями пошаговой установки Airflow.

Исходно у нас было три машины с OS Centos 7 и желание установить Airflow 2.3.2. Официальные требования от Airflow:

Python: 3.7, 3.8, 3.9, 3.10

PostgreSQL: 10, 11, 12, 13, 14

У нас использовались Python 3.9 и PostgreSQL 14. Заранее стоит создать в PostgreSQL схему данных для метаданных Airflow (далее будет описан пример) с полным к ней доступом пользователя (запись, чтение, создание объектов, т.д.) и установить Python 3.9 на каждой машине кластера.

Отдельно стоит остановится на том, как работает кластерная версия Airflow.

В Airflow есть возможность распределить нагрузку по выполнению задач на несколько машин. Для этого в Airflow есть плагин для Celery, который позволяет обрабатывать очередь задач и распределять их выполнение на узлах. Передачу сообщений между процессами обеспечивает брокер сообщений RabbitMQ. Celery устанавливается как плагин Airflow, RabbitMQ устанавливается отдельно на Main Node. Опишем шаги по установке всего необходимого на Main Node. Для Worker Node установка будет очень похожей, за исключением некоторых этапов.

Создаем на Main Node пользователя Airflow:

useradd airflow
passwd airflow

Далее все действия делаем из-под этого пользователя (за исключением создания базы в PostgreSQL) и все процессы, связанные с Airflow, также должны запускаться под данным пользователем. Если необходимо добавить пользователя в sudo, используем visudo. В нашей установке права sudo пользователю airflow были выданы.

По умолчанию в OS используется библиотека sqlite устаревшей для Airflow версии. Эту проблему можно обойти (инструкция: https://github.com/apache/airflow/issues/14208) следующим образом:

Скачиваем дистрибутив библиотеки

wget https://www.sqlite.org/2021/sqlite-autoconf-3340100.tar.gz

Распаковываем, собираем и устанавливаем

tar xvf sqlite-autoconf-3340100.tar.gz
cd sqlite-autoconf-3340100
./configure
make
sudo make install
ls /usr/local/lib – мы должны увидеть наши новые библиотеки

Для пользователя Airflow в .bash_profile прописываем

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

и используем source .bash_profile, либо снова заходим под пользователем airflow.

Также необходимо (в случае возникновения ошибки после установки airflow - ImportError: No module named _sqlite3) сделать следующее:

sudo yum install sqlite-devel
Для python сделать обновление (из папки с исходниками python)
./configure
make
make altinstall

Еще один метод решения проблемы – найти файл _sqlite3.cpython-36m-x86_64-linux-gnu.so и скопировать его в /usr/local/lib/python3.9/lib-dynload

Устанавливаем RabbitMQ (инструкция взята из https://habr.com/ru/post/539006/)

Устанавливаем пакеты:

sudo yum install epel-release
sudo yum install rabbitmq-server

Регистрируем и запускаем службы:

sudo systemctl enable rabbitmq-server.service
sudo systemctl start rabbitmq-server.service

Включаем интерфейс web-консоли управления RabbitMQ. По умолчанию адрес консоли будет MainNodeIP:15672. Также автоматически будет создан пользователь guest\guest (его мы будем потом использовать в настройках airflow):

sudo rabbitmq-plugins enable rabbitmq_management

Устанавливаем протокол транспорта pyamqp для RabbitMQ и адаптера PostgreSQL:

pip3.9 install pyamqp

Кроме того, необходимо добавить адаптер PostgreSQL для Python (ставим сразу собранную версию):

pip3.9 install psycopg2-binary

Теперь можем перейти к установке самого Airflow.

Переходим в папку, в которой должен быть установлен Airflow и выполняем:

pip3.9 install apache-airflow==2.3.2
pip3.9 install apache-airflow[celery]

Должна появиться папка Airflow, в которой будут необходимые файлы, а также основной файл настроек airflow.cfg. Если файл настроек и папки не появились, можно скачать их вручную с GIT-проекта Airflow и выложить в корень папки Airflow с названиями webserver_config.py и airflow.cfg соответственно.

Затем создаем пустые папки dags, logs, plugins: если ничего не менять в конфигурационном файле, они будут использоваться по умолчанию:

https://github.com/apache/airflow/blob/v2-3-stable/airflow/config_templates/default_webserver_config.py

https://github.com/apache/airflow/blob/v2-3-stable/airflow/config_templates/default_airflow.cfg

После установки выполняем следующую команду, которая должна вывести информацию без отметок об ошибках:

airflow info

Для конфигурационных файлов, взятых из GIT, может возникнуть ошибка: ValueError: Unable to configure handler 'processor'. Она возникает из-за формата заполнения параметров log_filename_template и log_processor_filename_template. Данные параметры необходимо закомментировать или изменить на работающий вариант:

log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log

Создаем базу для Airflow (расширенная инструкция: https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html). База должна быть создана в UTF-8:

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_user_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
ALTER USER airflow_user SET search_path = public.

Проверяем доступ к PostgreSQL для пользователя Airflow извне. При необходимости редактируем pg_hba.conf.

Далее настраиваем airflow.cfg:

executor = CeleryExecutor
sql_alchemy_conn=postgresql+psycopg2://airflow_user:airflow_user_pass@MainNodeIP:5432/airflow_db
broker_url = pyamqp://guest:guest@MainNodeIP:5672/
result_backend = db+postgresql://airflow_user:airflow_user_pass@MainNodeIP:5432/airflow_db
dags_are_paused_at_creation = True
load_examples = False

Проверяем через airflow info, что наши настройки подтянулись.

Отдельно отметим настройки (при установке их указывать не обязательно, так как значения будут использованы по умолчанию), но при промышленной эксплуатации они наверняка понадобятся.

worker_concurrency – количество задач, которые могут единовременно выполняться на одной ноде.

worker_autoscale – диапазон количества задач, которые могут единовременно выполняться на одной ноде (в случае указания данного параметра нагрузка будет определяться автоматически самим airflow).

Оба параметра нужны для балансировки нагрузки в зависимости от имеющихся ресурсов.

Подключаем базу:

airflow db init

Заходим в базу и проверяем, что таблицы airflow появились.

Стартуем Web-сервер, планировщик задач, celery:

airflow webserver -p 8000

Если настроечный файл airflow.cfg взят из GIT, необходимо закомментировать параметр secret_key, иначе приложение не запустится:

airflow scheduler
airflow celery worker
airflow celery flower #консоль для отслеживания работы celery workers, которая  доступна по адресу: MainNodeIP:5555.

Если все команды выполнены успешно, то по адресу MainNodeIP:5555 мы увидим зарегистрированный worker, а по адресу MainNodeIP:8000 откроется приложение с формой для аутентификации.

До подключения доменной аутентификации добавляем пользователя для Airflow:

airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin

После всех перечисленных шагов мы получили один Airflow на Main Node с одним Worker-ом. Для подключения Worker Node повторяем все шаги по установке Airflow (исключая шаг airflow db init), конфигурационный файл airflow.cfg можно и нужно взять с Main Node. Также не нужно ставить RabbitMQ. На Worker Node запускаем worker:

airflow celery worker

Через консоль Celery (адрес MainNodeIP:5555 ) мы должны увидеть, что он появился.

Нам необходимо добавить службы. Для этого скачиваем следующие файлы:

wget https://raw.githubusercontent.com/apache/airflow/master/scripts/systemd/airflow-scheduler.service;
wget https://raw.githubusercontent.com/apache/airflow/master/scripts/systemd/airflow-webserver.service;
wget https://raw.githubusercontent.com/apache/airflow/master/scripts/systemd/airflow-worker.service;
wget https://raw.githubusercontent.com/apache/airflow/master/scripts/systemd/airflow-flower.service.

Редактируем файлы (согласно нашим настройкам) и регистрируем службы.

Приводим пример для одного файла:

sudo cp airflow/run/airflow-worker.service /usr/lib/systemd/system
sudo systemctl enable airflow-worker.service
sudo systemctl start airflow-worker.service
sudo systemctl -l status airflow-worker.service

На Worker Node не будет служб airflow-scheduler.service и airflow-webserver.service.

Мы закончили основную установку Airflow на кластере, но нам потребовалось еще подключить LDAP-аутентификации в WEB-приложении Airflow.

Устанавливаем необходимые пакеты на Main Node:

sudo yum install python-devel openldap-devel
pip3.9 install python-ldap
pip3.9 install apache-airflow[ldap]

Далее необходимо скорректировать файл webserver_config.py до вида:

import os
from airflow.www.fab_security.manager import AUTH_DB
from airflow import configuration as conf
from flask_appbuilder.security.manager import AUTH_LDAP
basedir = os.path.abspath(os.path.dirname(__file__))
CSRF_ENABLED = True
AUTH_TYPE = AUTH_LDAP
AUTH_LDAP_SERVER = "ldap://ip-ldap-server:389"

Эта группа полей позволяет при первом логине в web-приложение автоматически добавить пользователя в базу, чтобы не добавлять его вручную. При этом ему будет назначена роль Public.

AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Public"

Здесь необходимо прописать свои уникальные значения:

AUTH_LDAP_SEARCH = "DC=,DC="
AUTH_LDAP_UID_FIELD = ""
AUTH_LDAP_BIND_USER = "CN=,OU=,OU=,DC=,DC="
AUTH_LDAP_BIND_PASSWORD = ""

Соответствие групп в AD и групп в Airflow:

AUTH_ROLES_MAPPING = {
#Разные группы из домена могут быть отнесены к группе Admin:
"CN=,OU=,DC=": ["Admin"],
 "CN=,OU=,DC=": ["Admin"],
}

Права перезаписываются согласно AUTH_ROLES_MAPPING при каждом логине:

AUTH_ROLES_SYNC_AT_LOGIN = True

Время жизни одной сессии после логина:

PERMANENT_SESSION_LIFETIME = 1800
AUTH_LDAP_USE_TLS = False
AUTH_LDAP_ALLOW_SELF_SIGNED = False
AUTH_LDAP_TLS_CACERTFILE = '/etc/ssl/certs/ca-certificates.crt'

В такой конфигурации Airflow успешно справляется со своими задачами, проходит запуск 200+ DAG (ов) ежедневно, добавляются новые DAG (и) без каких-либо изменений в конфигурации кластера.

Мы постарались детально описать установку Airflow, отметив некоторые лайфхаки. Такой вариант установки займет пару часов, что значительно меньше того времени, которое было потрачено на изучение вопроса и разбор встречающихся сложностей при первой установке. Надеемся, что материал оказался полезным и прикладным. Делитесь в комментариях своим опытом реализации подобной задачи.

Комментарии (4)


  1. rakhinskiy
    22.05.2023 10:51

    Странно что вы собираете sqlite из исходников, а devel берете из пакета
    Проще взять например https://src.fedoraproject.org/rpms/sqlite/blob/rawhide/f/sqlite.spec
    Поправить версию на свою и собрать rpm, тогда не надо будет править LD_LIBRARY_PATH и это облегчит повторные установки
    Так же не рекомендуется использовать pyscopg2-binary в production, цитата
    "The binary package is a practical choice for development and testing but in production it is advised to use the package built from sources."


    1. neoflex Автор
      22.05.2023 10:51

      Добрый день! Предложенный вариант был рабочим, но и другие варианты также имеют место быть.


  1. nikn24
    22.05.2023 10:51
    +1

    Здравствуйте! А как решали вопрос администрирования python-зависимостей в данной схеме при наличии N физических/виртуальных нод? Например, появилась новая библиотека, которую нужно раскатить на все ноды воркеров для корректной работы DAG.


    1. neoflex Автор
      22.05.2023 10:51

      Добрый день! На текущий момент в ручном режиме