Привет, Хабр! Меня зовут Рустем, являюсь Senior DevOps Engineer в компании IBM.
Сегодня я хотел бы познакомить вас с важным инструментом в методологии DataOps, а именно Apache Airflow и проектирование Data Pipelines (Конвейеры данных).
Эта статья будет посвящена краткому введению в Airflow и шагов по созданию и настройке конвейеров данных (Data Pipelines). Сначала мы установим и сконфигурируем Airflow. Затем рассмотрим практический пример создания и запуска DAG в Airflow. Сегодняшней нашей целью является практическое понимание развертывания Airflow и базовой разработки DAG.
Немного про DataOps
Методология DataOps предназначена для того, чтобы позволить организации использовать повторяющийся процесс для создания и развертывания аналитики и конвейеров данных. Следуя методам управления данными и моделями, они могут предоставлять высококачественные корпоративные данные для применения их AI.
Иными словами, практика DataOps позволяет перенести опыт DevOps на управление данными и аналитику. Практика показывает, что эффективное развертывание DataOps ускоряет вывод аналитических решений на рынок, повышает качество данных и их соответствие нормативным требованиям, а также сокращает затраты на управление данными.
Немного про AirFlow
На этом шаге мы установим пакет Apache Airflow Python в своей среде и инициализируем конфигурацию.
Установим пакет Airflow, выполнив следующую команду в терминале:
pip install "apache-airflow==2.3.0" --ignore-installed
Обратите внимание, что мы закрепляем конкретную версию Airflow и используем здесь флаг --ignore-installed
, чтобы избежать некоторых конфликтов версий с зависимостями пакетов.
Инициализация базы данных Airflow
Airflow использует реляционную базу данных в качестве серверной части для хранения данных конфигурации. По умолчанию это база данных SQLite, которая будет храниться в ~/airflow/airflow.db. Мы инициализируем базу данных в своей среде, выполнив в терминале следующую команду:
airflow db init
Создайте пользователя-администратора.
Затем нам нужно создать пользователя, который может войти в пользовательский интерфейс Airflow. Введем в терминале следующее, чтобы создать пользователя с именем admin с правами администратора:
airflow users create \
--username admin \
--firstname Firstname \
--lastname Lastname \
--role Admin \
--email admin@example.org \
--password password
В случае успеха мы увидим следующий вывод:
Запуск веб-сервера и планировщика
Чтобы убедиться, что конфигурация работает правильно, мы можете запустить веб-сервер и планировщик Airflow и войти в пользовательский интерфейс. Выполним следующие команды в терминале, чтобы запустить веб-сервер и планировщик:
airflow webserver --port 8080 -D
Это запускает веб-интерфейс Airflow на порту 8080
После загрузки пользовательского интерфейса мы увидим страницу входа. Введем учетные данные пользователя, созданные на предыдущем шаге:
Username: admin
Password: password
Если все настроено успешно, то мы увидим веб-интерфейс Airflow со списком примеров DAG:
Теперь можно вернуться к нашему терминалу и выйти из процесса планировщика (опционально, но я рекомендую это сделать). Вводим: ctrl+C
Открываем новый терминал и запускаем команду: airflow scheduler
Давайте настроим AirFlow, используя некоторые best-practices.
Давайте сперва выведем список DAG, вернемся в наш первый терминал и выполним команду: airflow dags list
Теперь отредактируем файл конфигураций, который должен быть в /root/airflow/airflow.cfg
. Этот файл содержит информацию обо всех настройках нашего Airflow.
Эти настройки представлены в виде пар ключ/значение, которые выглядят какsetting_name = setting_value
Изменим локацию папки DAG
Мы собираемся использовать новую папку для нашего кода Python DAG. Сначала мы должны обновить настройку папки DAG на новый путь. Для наших целей мы будем использовать путь /root/airflow_demo/dags
.
Изменим настройку dags_folder в строке 4, чтобы она выглядела следующим образом:
Это указывает Airflow на каталог проекта, где будут хранится и создаваться новые сущности DAG.
Отключим примеры DAG
Есть много примеров DAG, которые автоматически доступны нам, как Вы можете видеть в пользовательском интерфейсе. Это немного затрудняет просмотр созданных нами DAG, поэтому давайте скроем эти примеры DAG.
Кроме того, изменим параметр load_examples
в строке 51, чтобы он выглядел следующим образом:
Это не позволяет Airflow загружать примеры DAG.
Изменим цвет панели навигации
Наш Airflow в настоящее время работает, как наша производственная среда. Любой, кто работает в DevOps, скажет вам, что разделение производственной и промежуточной сред невероятно важно.
В общем, визуальная дифференциация лучше всего, и, к счастью, Airflow предоставляет настройку для очевидного визуального изменения. Airflow позволяет нам контролировать цвет заголовка.
Давайте обновим настройку заголовка в airflow.cfg, чтобы каждый в этой среде знал, что нужно быть особенно осторожным:navbar_color = #ffc0cb
Принудительная перезагрузим Airflow
Мы собираемся принудительно перезагрузить Airflow, чтобы нам не приходилось ждать, пока он перезагрузится автоматически.
В терминале еще раз выполним следующую команду:
airflow dags list
Теперь наш список DAG должен быть пустым (поскольку в /root/airflow_demo/dags
пока ничего нет).
Нам нужно будет повторно инициализировать базу данных Airflow, чтобы некоторые из этих настроек были выбраны. Выполним следующую команду:
airflow db init
Перезапустим веб-сервер
Наконец, нам нужно будет остановить/перезапустить текущий процесс веб-сервера (вы можете просто убить pid). Чтобы найти pid, выполним следующую команду:
cat /root/airflow/airflow-webserver.pid
И мы можем убить процесс с помощью этой команды:
kill $(cat /root/airflow/airflow-webserver.pid)
После того, как веб-сервер Airflow был остановлен, мы можем запустить следующую команду, чтобы запустить его еще раз:
airflow webserver --port 8080 -D
Также перезапустим планировщик.
Сначала мы остановим планировщик. Перейдем в терминал, где запущен планировщик, и выполним следующее:
Ctrl +C и airflow scheduler
Протестируем наш AirFlow
Создадим DAG-файл по пути /root/airflow_demo/dags
с именем prove-things-work.py и следующим кодом:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner' : 'Vinoo',
'depends_on_past' :False,
'email' :['email@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'catchup': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'CreateFile',
default_args=default_args,
start_date=datetime(2022,1,1,0,0),
schedule_interval=timedelta(minutes=500))
task1 = BashOperator(
task_id='prove_things_work',
bash_command='echo "hello, world!" > /root/create-this-file.txt',
dag=dag)
Теперь проверим синтаксис нашего DAG, давайте вернемся в наш первый терминал и запустим команду:
python3 airflow_demo/dags/prove-things-work.py
Это проверит наличие синтаксических ошибок Python в файле. В случае успеха вывода не будет.
В качестве нашего последнего шага, вместо того, чтобы ждать, пока планировщик подберет его, давайте обновим базу данных Airflow, чтобы наш DAG был инициализирован Airflow:
airflow db init
Как мы видим, AirFlow инициализировал наш DAG
Давайте запустим его, для этого нажмем кнопку play -> trigger DAG
На панели древовидного представления теперь вы увидите квадратные значки рядом с каждой задачей в группе обеспечения доступности баз данных, меняющей цвет по мере того, как задачи ставятся в очередь и выполняются. Если задачи выполняются успешно, эти значки будут отмечены темно-зеленым цветом в пользовательском интерфейсе. Если задачи не пройдены, квадраты будут отмечены красным цветом.
Сработал? Давайте убедимся, что DAG отработался успешно. Выполним следующую команду: cat /root/create-this-file.txt
Если вы видите фразу “Hello world” — ваш DAG сработал!
Теперь создадим двух-узловой DAG
Создадим файл с именем two-node-dag.py со следующим кодом:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'catchup': False,
'start_date': datetime(2022, 1, 1)
}
with DAG(
dag_id='two-node-dag',
description='An example Airflow DAG',
schedule_interval=None,
default_args=default_args
) as dag:
t0 = BashOperator(
task_id='bash_task_0',
bash_command='echo "Hi there, this is the first Airflow task!"'
)
t1 = BashOperator(
task_id='bash_task_1',
bash_command='echo "Sleeping..." && sleep 5s && date'
)
t0 >> t1
Протестируем синтаксис:
python3 airflow_demo/dags/two-node-dag.py
Вернемся к нашему UI
Если планировщик запущен, то АirFlow сам подберет новый DAG спустя какое-то время. Однако, если вы не хотите ждать, вы можете запустить следующую команду для принудительной синхронизации:
airflow db init
Видим, что наш DAG виден нашему Airflow.
Однако в этот раз мы запустим его с терминала:
airflow dags trigger two-node-dag
И под конец заглянем в логи нашего DAG, для этого вернемся в наш UI.
Нажмем на наш DAG:
Перейдем в Graph
Выбираем Таск
И в попап окне выбираем log
Мы успешно настроили и запустили несколько DAG в Airflow. Подведем итоги:
Развертывание и конфигурация AIrflow — это несложно.
Создание DAG в Airflow — это легкий и простой процесс.
DAG определяются кодом.
DAG сущности достаточно гибкие в использовании.
Скоро в OTUS состоится открытое занятие на тему «MapReduce: алгоритм обработки больших данных». На нем подробно разберем универсальный алгоритм, с помощью которого обрабатываются большие данные на распределённых системах без общего хранилища (Hadoop, Spark). Поговорим об «узких местах» и потенциальных операционных проблемах. Посмотрим, как это выглядит на практике в Яндекс.Облаке. Регистрация доступна для всех желающих по ссылке.