Всем привет! В данной статье мы рассмотрим как можно локально развернуть airflow с помощью docker desktop'a и PyCharm'a. Кроме него развернём ещё и следующее: rabbitMQ, postgreSQL, redis и MongoDB.

Первый делом, нужно установить себе на компьютер Docker desktop, PyCharm и PgAdmin4. Теперь нам нужен yaml файл, в котором будет прописано все образы, которые мы хотим поставить. Готовый файл для нашей задачи можете скачать по ссылке. Когда всё необходимое было скачано, можно приступить к развёртыванию airflow.

Создаём проект в PyCharm, и в корневую папку загружаем наш yaml файл. В терминале выполняем следующий код: docker compose up -d. Когда нажмёте enter, то у вас начнётся скачивание необходимых образов в docker desktop и сразу из них запустятся контейнеры, кроме этого, в проекте у вас создастся необходимая структура папок.

Когда всё скачается, у вас docker desktop должен выглядеть примерно так:

На данный момент, у нас уже локально развернулось всё, что я обещал, теперь будем устанавливать необходимые соединения.

Объединение airflow и postgres

Начнём с Postgres; зайдём в PgAdmin и подключимся к нашему серверу

Логин и пароль: airflow, airflow соответственно

После подключения у вас должно появиться в сервере две базы данных:

Для работы с airflow используется airflow. Для будущего написания ETL давайте сразу создадим таблицу:

Теперь займёмся установкой соединения между airflow и postgres. Для того, чтобы перейти в airflow, зайдём в docker desktop и нажмём на порт у airflow-webserver:

Нужно будет ввести логин и пароль: airflow и airflow

После этого, вы окажетесь на основной странице airflow:

Теперь, чтобы наш airflow мог взаимодействовать с postgres, необходимо создать соединение между ними в airflow:

Пароль указываем от нашей базы данных, в нашем случае airflow

Сохраняем соединение и теперь у нас налажена работа airflow и postgres.

Напишем самый простой dag, чтобы проверить, что всё работает. Для начала нужно установить дополнительные библиотеки в PyCharm: apache-airflow и apache-airflow-providers-postgres.

Напишем код:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook


hook = PostgresHook(postgres_conn_id='Postgres')

request = "insert into public.data (name, birth_date, age) values ('Anton', '2004.11.14', 19)"


def insert():
    hook.run(request)


with DAG(
    dag_id="base_dag",
    default_args={
        "owner": "Kirill",
    }
) as dag:

    t = PythonOperator(
        task_id='insert_postgres',
        python_callable=insert
        )

Как только мы закончим его писать, в airflow появится наш dag:

Запустим его и посмотрим, что получилось.

Как видим, dag отработал успешно и добавил новую строку в нашу таблицу.

Объединение airflow и rabbitmq

Начнём с настройки rabbitMQ. Для того, чтобы перейти в rabbitMQ, нужно нажать на его порт в docker desktop'е:

Логин и пароль: guest, guest. Перейдём в раздел "Queues" и создадим там новую очередь с названием queue.airflow:

Вернёмся в PyCharm и установим библиотеку airflow-provider-rabbitmq.

Перейдём в airflow в раздел с соединениями и добавим соединение с брокером, в поле с хостом нужно указать имя нашего контейнера с rabbit, то есть, rabbitmq_main:

После всего этого, настройка брокера закончена и мы можем приступать к взаимодействию питона, airflow и брокера. Напишем базовую программу:

Для этого в папке dags создадим папку tasks_broker. В ней создадим sensor.py с следующим кодом:

from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor


sensor = RabbitMQSensor(
    task_id="sensor",
    queue_name="queue.airflow",
    rabbitmq_conn_id="RabbitMQ",
)

Данный код является таской, которая принимает данные из брокера и передаёт в другую таску.

Также в папке tasks_broker создадим get_data.py с следующим содержанием:

import json
from airflow.decorators import task


@task(task_id="get_data")
def get_data(**kwargs) -> None:
    message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='sensor'))

    print("#########################################################################################")
    print(message)
    print("#########################################################################################")

После этого, в папке dags создадим python_broker.py:

from airflow import DAG
from tasks_broker.sensor import sensor
from tasks_broker.get_data import get_data


with DAG(
        dag_id="python_broker",
        schedule_interval=None,
) as dag:
    sensor >> get_data()

И у нас в airflow появится новый dag python_broker. Запустим его и он будет ждать до тех пор, пока из брокера не поступит сообщение. Чтобы отправить сообщение из брокера, нужно перейти в раздел "queues" и выбрать необходимую очередь, в нашем случае queue.airflow. Там выбрать "publish message", ввести нужное сообщение и отправить.

Как мы отправим сообщение, наш dag выполнится и мы можем убедиться в том, что всё прошло успешно, зайдя в логи второй таски:

Решение задачи ETL. Объединение airflow, rabbitmq и postgres

Давайте теперь решим задачу ETL, будет три таски:

  1. Получаем данные о человеке в формате json из брокера (имя, пол, дату рождения, возраст, город проживания) и передаём его во вторую таску для обработки

  2. Вторая таска отвечает за обработку полученных полей и дальнейшую передачу результата в третью таску (будем из пяти полей передавать только три (имя, дату рождения и возраст))

  3. Третья таска будет загружать данные в postgres

По сути, все соединения у нас установлены, необходимость лишь заключается в правильно написанном коде. В папке dags создадим папку tasks_broker_postgres. Там создадим три файла extract.py, transformation.py и load.py.

extract.py:

from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor


extract = RabbitMQSensor(
    task_id="extract",
    queue_name="queue.airflow",
    rabbitmq_conn_id="RabbitMQ",
)

transformation.py:

import json
from airflow.decorators import task


@task(task_id="transformation")
def transformation(**kwargs) -> dict:
    message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='extract'))

    new_message = {"name": message["name"],
                   "birth_date": message["birth_date"],
                   "age": message["age"]}

    return new_message

load.py:

from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task(task_id="load")
def load(**kwargs) -> None:
    message = kwargs['task_instance'].xcom_pull(task_ids='transformation')

    hook = PostgresHook(postgres_conn_id='Postgres')

    request = ("insert into public.data (name, birth_date, age) values ('" + message["name"] + "','" +
                                                                            message["birth_date"] + "'," +
                                                                            str(message["age"]) + ")")

    hook.run(request)

Теперь в папке dags создадим python_broker_postgres.py:

from airflow import DAG
from tasks_broker_postgres.extract import extract
from tasks_broker_postgres.transformation import transformation
from tasks_broker_postgres.load import load


with DAG(
        dag_id="python_broker_postgres",
        schedule_interval=None,
) as dag:
    extract >> transformation() >> load()

Запустим dag в airflow, зайдём в брокер и отправим сообщение:

По логам смотрим, что все таски прошли успешно:

Зайдём теперь в базу данных:

Как видим, наше сообщение успешно обработалось и добавилось в базу данных.

Заключение

В данной статье мы рассмотрели взаимодействие python, airflow, postgres и rabbitmq.

Если вы не забыли, то в yaml файл входит ещё и MongoDB, работа с ней осуществляется аналогично работе с postgres. Для удобной работы с MongoDB можно скачать эту программу.

До новых публикаций!

Комментарии (4)


  1. Kahelman
    10.11.2024 23:31

    Что это было?


    1. nik190397
      10.11.2024 23:31

      Гайд по возможному использованию эйрфлоу

      Достаточно полный и полезный, потому что я, когда искал инфу насчет Connections в интерфейсе эйрфлоу, не смог найти ничего ни на русском, ни на английском, а тут есть скрины, которые показывают, что надо в качестве host указывать название контейнера, а не просто localhost. Если ты видел инфу об этом где-то хотя бы на английском, то супер и поделись, пожалуйста, ссылкой


  1. dreadangel
    10.11.2024 23:31

    Статья из разряда "голопом по европам", но есть довольно интересного, Автору спасибо за это...

    Уважаемые питонисты - помогите понять следующую конструкцию

    sensor >> get_data()

    я понимаю что это оператор сдвига вправо, но причем тут объект и метод?


    1. Z1at Автор
      10.11.2024 23:31

      В данном случае, это оператор, который устанавливает зависимости между тасками. То есть с помощью него мы даём понять airflow, что мы хотим, чтобы get_data() начал выполняться только после того, как закончит свою работу sensor.

      Например, в написание ETL мы использовали эту конструкцию: extract >> transformation() >> load(). Сделали мы это для того, чтобы сначала выполнился extract, после него transformation и в самом конце load. Если бы мы это не прописали, то у airflow не было бы чёткого порядка выполнения тасок и из-за этого, они могли бы начать работать одновременно, что привело бы к ошибкам.

      Подробнее можно почитать тут.