Привет, Хабр! Меня зовут Рустем, являюсь Senior DevOps Engineer в компании IBM.

Сегодня я хотел бы познакомить вас с важным инструментом в методологии DataOps, а именно Apache Airflow и проектирование Data Pipelines (Конвейеры данных).

Эта статья будет посвящена краткому введению в Airflow и шагов по созданию и настройке конвейеров данных (Data Pipelines). Сначала мы установим и сконфигурируем Airflow. Затем рассмотрим практический пример создания и запуска DAG в Airflow. Сегодняшней нашей целью является практическое понимание развертывания Airflow и базовой разработки DAG.

Немного про DataOps

Методология DataOps предназначена для того, чтобы позволить организации использовать повторяющийся процесс для создания и развертывания аналитики и конвейеров данных. Следуя методам управления данными и моделями, они могут предоставлять высококачественные корпоративные данные для применения их AI.

Иными словами, практика DataOps позволяет перенести опыт DevOps на управление данными и аналитику. Практика показывает, что эффективное развертывание DataOps ускоряет вывод аналитических решений на рынок, повышает качество данных и их соответствие нормативным требованиям, а также сокращает затраты на управление данными.

Немного про AirFlow

На этом шаге мы установим пакет Apache Airflow Python в своей среде и инициализируем конфигурацию.

Установим пакет Airflow, выполнив следующую команду в терминале:

pip install "apache-airflow==2.3.0" --ignore-installed

Обратите внимание, что мы закрепляем конкретную версию Airflow и используем здесь флаг --ignore-installed, чтобы избежать некоторых конфликтов версий с зависимостями пакетов.

Инициализация базы данных Airflow

Airflow использует реляционную базу данных в качестве серверной части для хранения данных конфигурации. По умолчанию это база данных SQLite, которая будет храниться в ~/airflow/airflow.db. Мы инициализируем базу данных в своей среде, выполнив в терминале следующую команду:

airflow db init

Создайте пользователя-администратора.

Затем нам нужно создать пользователя, который может войти в пользовательский интерфейс Airflow. Введем в терминале следующее, чтобы создать пользователя с именем admin с правами администратора:

airflow users create \

    --username admin \

    --firstname Firstname \

    --lastname Lastname \

    --role Admin \

    --email admin@example.org \

    --password password

В случае успеха мы увидим следующий вывод:

Запуск веб-сервера и планировщика

Чтобы убедиться, что конфигурация работает правильно, мы можете запустить веб-сервер и планировщик Airflow и войти в пользовательский интерфейс. Выполним следующие команды в терминале, чтобы запустить веб-сервер и планировщик:

airflow webserver --port 8080 -D
airflow scheduler
airflow scheduler

Это запускает веб-интерфейс Airflow на порту 8080

Логинимся в Airflow
Логинимся в Airflow

После загрузки пользовательского интерфейса мы увидим страницу входа. Введем учетные данные пользователя, созданные на предыдущем шаге:

     Username: admin

     Password: password

Если все настроено успешно, то мы увидим веб-интерфейс Airflow со списком примеров DAG:



Теперь можно вернуться к нашему терминалу и выйти из процесса планировщика (опционально, но я рекомендую это сделать). Вводим: ctrl+C

Открываем новый терминал и запускаем команду: airflow scheduler

Конфигурируем AirFlow
Конфигурируем AirFlow

Давайте настроим AirFlow, используя некоторые best-practices.

Давайте сперва выведем список DAG, вернемся в наш первый терминал и выполним команду: airflow dags list

Теперь отредактируем файл конфигураций, который должен быть в /root/airflow/airflow.cfg. Этот файл содержит информацию обо всех настройках нашего Airflow.

Эти настройки представлены в виде пар ключ/значение, которые выглядят как
setting_name = setting_value

Изменим локацию папки DAG

Мы собираемся использовать новую папку для нашего кода Python DAG. Сначала мы должны обновить настройку папки DAG на новый путь. Для наших целей мы будем использовать путь /root/airflow_demo/dags.

Изменим настройку dags_folder в строке 4, чтобы она выглядела следующим образом:

Это указывает Airflow на каталог проекта, где будут хранится и создаваться новые сущности DAG.

Отключим примеры DAG

Есть много примеров DAG, которые автоматически доступны нам, как Вы можете видеть в пользовательском интерфейсе. Это немного затрудняет просмотр созданных нами DAG, поэтому давайте скроем эти примеры DAG.

Кроме того, изменим параметр load_examples в строке 51, чтобы он выглядел следующим образом:

Это не позволяет Airflow загружать примеры DAG.

Изменим цвет панели навигации

Наш Airflow в настоящее время работает, как наша производственная среда. Любой, кто работает в DevOps, скажет вам, что разделение производственной и промежуточной сред невероятно важно.

В общем, визуальная дифференциация лучше всего, и, к счастью, Airflow предоставляет настройку для очевидного визуального изменения. Airflow позволяет нам контролировать цвет заголовка.

Давайте обновим настройку заголовка в airflow.cfg, чтобы каждый в этой среде знал, что нужно быть особенно осторожным:

navbar_color = #ffc0cb

Принудительная перезагрузим Airflow

Мы собираемся принудительно перезагрузить Airflow, чтобы нам не приходилось ждать, пока он перезагрузится автоматически.

В терминале еще раз выполним следующую команду:

airflow dags list

Теперь наш список DAG должен быть пустым (поскольку в /root/airflow_demo/dags пока ничего нет).

Нам нужно будет повторно инициализировать базу данных Airflow, чтобы некоторые из этих настроек были выбраны. Выполним следующую команду:

airflow db init

Перезапустим веб-сервер

Наконец, нам нужно будет остановить/перезапустить текущий процесс веб-сервера (вы можете просто убить pid). Чтобы найти pid, выполним следующую команду:

cat /root/airflow/airflow-webserver.pid

И мы можем убить процесс с помощью этой команды:

kill $(cat /root/airflow/airflow-webserver.pid)

После того, как веб-сервер Airflow был остановлен, мы можем запустить следующую команду, чтобы запустить его еще раз:

airflow webserver --port 8080 -D

Также перезапустим планировщик.

Сначала мы остановим планировщик. Перейдем в терминал, где запущен планировщик, и выполним следующее:

Ctrl +C и airflow scheduler

Протестируем наш AirFlow

Создадим DAG-файл по пути /root/airflow_demo/dags с именем prove-things-work.py и следующим кодом:

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {

        'owner' : 'Vinoo',

        'depends_on_past' :False,

        'email' :['email@example.com'],

        'email_on_failure': False,

        'email_on_retry': False,

        'catchup': False,

        'retries': 1,

        'retry_delay': timedelta(minutes=5)

        }

dag = DAG(

    'CreateFile',

    default_args=default_args,

    start_date=datetime(2022,1,1,0,0),

    schedule_interval=timedelta(minutes=500))

task1 = BashOperator(

        task_id='prove_things_work',

        bash_command='echo "hello, world!" > /root/create-this-file.txt',

        dag=dag)

Теперь проверим синтаксис нашего DAG, давайте вернемся в наш первый терминал и запустим команду:

python3 airflow_demo/dags/prove-things-work.py

Это проверит наличие синтаксических ошибок Python в файле. В случае успеха вывода не будет.

В качестве нашего последнего шага, вместо того, чтобы ждать, пока планировщик подберет его, давайте обновим базу данных Airflow, чтобы наш DAG был инициализирован Airflow:

airflow db init

Как мы видим, AirFlow инициализировал наш DAG

Давайте запустим его, для этого нажмем кнопку play -> trigger DAG

На панели древовидного представления теперь вы увидите квадратные значки рядом с каждой задачей в группе обеспечения доступности баз данных, меняющей цвет по мере того, как задачи ставятся в очередь и выполняются. Если задачи выполняются успешно, эти значки будут отмечены темно-зеленым цветом в пользовательском интерфейсе. Если задачи не пройдены, квадраты будут отмечены красным цветом.

Сработал? Давайте убедимся, что DAG отработался успешно. Выполним следующую команду: cat /root/create-this-file.txt

Если вы видите фразу “Hello world” — ваш DAG сработал!

Теперь создадим двух-узловой DAG

Создадим файл с именем two-node-dag.py со следующим кодом:

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime

# Default settings applied to all tasks

default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'email_on_failure': False,

    'email_on_retry': False,

    'retries': 0,

    'catchup': False,

    'start_date': datetime(2022, 1, 1)

}

with DAG(

    dag_id='two-node-dag',

    description='An example Airflow DAG',

    schedule_interval=None,

    default_args=default_args

    ) as dag:

    t0 = BashOperator(

        task_id='bash_task_0',

        bash_command='echo "Hi there, this is the first Airflow task!"'

    )

    t1 = BashOperator(

        task_id='bash_task_1',

        bash_command='echo "Sleeping..." && sleep 5s && date'

    )

    t0 >> t1

Протестируем синтаксис:

python3 airflow_demo/dags/two-node-dag.py

Вернемся к нашему UI

Если планировщик запущен, то АirFlow сам подберет новый DAG спустя какое-то время. Однако, если вы не хотите ждать, вы можете запустить следующую команду для принудительной синхронизации:

airflow db init

Видим, что наш DAG виден нашему Airflow.

Однако в этот раз мы запустим его с терминала:

airflow dags trigger two-node-dag

И под конец заглянем в логи нашего DAG, для этого вернемся в наш UI.

Нажмем на наш DAG:

Перейдем в Graph

Выбираем Таск

И в попап окне выбираем log

Мы успешно настроили и запустили несколько DAG в Airflow. Подведем итоги:

  • Развертывание и конфигурация AIrflow — это несложно.

  • Создание DAG в Airflow — это легкий и простой процесс.

  • DAG определяются кодом.

  • DAG сущности достаточно гибкие в использовании.


Скоро в OTUS состоится открытое занятие на тему «MapReduce: алгоритм обработки больших данных». На нем подробно разберем универсальный алгоритм, с помощью которого обрабатываются большие данные на распределённых системах без общего хранилища (Hadoop, Spark). Поговорим об «узких местах» и потенциальных операционных проблемах. Посмотрим, как это выглядит на практике в Яндекс.Облаке. Регистрация доступна для всех желающих по ссылке.

Комментарии (0)