Всем привет! В данной статье мы рассмотрим как можно локально развернуть airflow с помощью docker desktop'a и PyCharm'a. Кроме него развернём ещё и следующее: rabbitMQ, postgreSQL, redis и MongoDB.
Первый делом, нужно установить себе на компьютер Docker desktop, PyCharm и PgAdmin4. Теперь нам нужен yaml файл, в котором будет прописано все образы, которые мы хотим поставить. Готовый файл для нашей задачи можете скачать по ссылке. Когда всё необходимое было скачано, можно приступить к развёртыванию airflow.
Создаём проект в PyCharm, и в корневую папку загружаем наш yaml файл. В терминале выполняем следующий код: docker compose up -d. Когда нажмёте enter, то у вас начнётся скачивание необходимых образов в docker desktop и сразу из них запустятся контейнеры, кроме этого, в проекте у вас создастся необходимая структура папок.
Когда всё скачается, у вас docker desktop должен выглядеть примерно так:
На данный момент, у нас уже локально развернулось всё, что я обещал, теперь будем устанавливать необходимые соединения.
Объединение airflow и postgres
Начнём с Postgres; зайдём в PgAdmin и подключимся к нашему серверу
Логин и пароль: airflow, airflow соответственно
После подключения у вас должно появиться в сервере две базы данных:
Для работы с airflow используется airflow. Для будущего написания ETL давайте сразу создадим таблицу:
Теперь займёмся установкой соединения между airflow и postgres. Для того, чтобы перейти в airflow, зайдём в docker desktop и нажмём на порт у airflow-webserver:
Нужно будет ввести логин и пароль: airflow и airflow
После этого, вы окажетесь на основной странице airflow:
Теперь, чтобы наш airflow мог взаимодействовать с postgres, необходимо создать соединение между ними в airflow:
Пароль указываем от нашей базы данных, в нашем случае airflow
Сохраняем соединение и теперь у нас налажена работа airflow и postgres.
Напишем самый простой dag, чтобы проверить, что всё работает. Для начала нужно установить дополнительные библиотеки в PyCharm: apache-airflow и apache-airflow-providers-postgres.
Напишем код:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='Postgres')
request = "insert into public.data (name, birth_date, age) values ('Anton', '2004.11.14', 19)"
def insert():
hook.run(request)
with DAG(
dag_id="base_dag",
default_args={
"owner": "Kirill",
}
) as dag:
t = PythonOperator(
task_id='insert_postgres',
python_callable=insert
)
Как только мы закончим его писать, в airflow появится наш dag:
Запустим его и посмотрим, что получилось.
Как видим, dag отработал успешно и добавил новую строку в нашу таблицу.
Объединение airflow и rabbitmq
Начнём с настройки rabbitMQ. Для того, чтобы перейти в rabbitMQ, нужно нажать на его порт в docker desktop'е:
Логин и пароль: guest, guest. Перейдём в раздел "Queues" и создадим там новую очередь с названием queue.airflow:
Вернёмся в PyCharm и установим библиотеку airflow-provider-rabbitmq.
Перейдём в airflow в раздел с соединениями и добавим соединение с брокером, в поле с хостом нужно указать имя нашего контейнера с rabbit, то есть, rabbitmq_main:
После всего этого, настройка брокера закончена и мы можем приступать к взаимодействию питона, airflow и брокера. Напишем базовую программу:
Для этого в папке dags создадим папку tasks_broker. В ней создадим sensor.py с следующим кодом:
from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor
sensor = RabbitMQSensor(
task_id="sensor",
queue_name="queue.airflow",
rabbitmq_conn_id="RabbitMQ",
)
Данный код является таской, которая принимает данные из брокера и передаёт в другую таску.
Также в папке tasks_broker создадим get_data.py с следующим содержанием:
import json
from airflow.decorators import task
@task(task_id="get_data")
def get_data(**kwargs) -> None:
message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='sensor'))
print("#########################################################################################")
print(message)
print("#########################################################################################")
После этого, в папке dags создадим python_broker.py:
from airflow import DAG
from tasks_broker.sensor import sensor
from tasks_broker.get_data import get_data
with DAG(
dag_id="python_broker",
schedule_interval=None,
) as dag:
sensor >> get_data()
И у нас в airflow появится новый dag python_broker. Запустим его и он будет ждать до тех пор, пока из брокера не поступит сообщение. Чтобы отправить сообщение из брокера, нужно перейти в раздел "queues" и выбрать необходимую очередь, в нашем случае queue.airflow. Там выбрать "publish message", ввести нужное сообщение и отправить.
Как мы отправим сообщение, наш dag выполнится и мы можем убедиться в том, что всё прошло успешно, зайдя в логи второй таски:
Решение задачи ETL. Объединение airflow, rabbitmq и postgres
Давайте теперь решим задачу ETL, будет три таски:
Получаем данные о человеке в формате json из брокера (имя, пол, дату рождения, возраст, город проживания) и передаём его во вторую таску для обработки
Вторая таска отвечает за обработку полученных полей и дальнейшую передачу результата в третью таску (будем из пяти полей передавать только три (имя, дату рождения и возраст))
Третья таска будет загружать данные в postgres
По сути, все соединения у нас установлены, необходимость лишь заключается в правильно написанном коде. В папке dags создадим папку tasks_broker_postgres. Там создадим три файла extract.py, transformation.py и load.py.
extract.py:
from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor
extract = RabbitMQSensor(
task_id="extract",
queue_name="queue.airflow",
rabbitmq_conn_id="RabbitMQ",
)
transformation.py:
import json
from airflow.decorators import task
@task(task_id="transformation")
def transformation(**kwargs) -> dict:
message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='extract'))
new_message = {"name": message["name"],
"birth_date": message["birth_date"],
"age": message["age"]}
return new_message
load.py:
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task(task_id="load")
def load(**kwargs) -> None:
message = kwargs['task_instance'].xcom_pull(task_ids='transformation')
hook = PostgresHook(postgres_conn_id='Postgres')
request = ("insert into public.data (name, birth_date, age) values ('" + message["name"] + "','" +
message["birth_date"] + "'," +
str(message["age"]) + ")")
hook.run(request)
Теперь в папке dags создадим python_broker_postgres.py:
from airflow import DAG
from tasks_broker_postgres.extract import extract
from tasks_broker_postgres.transformation import transformation
from tasks_broker_postgres.load import load
with DAG(
dag_id="python_broker_postgres",
schedule_interval=None,
) as dag:
extract >> transformation() >> load()
Запустим dag в airflow, зайдём в брокер и отправим сообщение:
По логам смотрим, что все таски прошли успешно:
Зайдём теперь в базу данных:
Как видим, наше сообщение успешно обработалось и добавилось в базу данных.
Заключение
В данной статье мы рассмотрели взаимодействие python, airflow, postgres и rabbitmq.
Если вы не забыли, то в yaml файл входит ещё и MongoDB, работа с ней осуществляется аналогично работе с postgres. Для удобной работы с MongoDB можно скачать эту программу.
До новых публикаций!
Комментарии (4)
dreadangel
10.11.2024 23:31Статья из разряда "голопом по европам", но есть довольно интересного, Автору спасибо за это...
Уважаемые питонисты - помогите понять следующую конструкцию
sensor >> get_data()
я понимаю что это оператор сдвига вправо, но причем тут объект и метод?
Z1at Автор
10.11.2024 23:31В данном случае, это оператор, который устанавливает зависимости между тасками. То есть с помощью него мы даём понять airflow, что мы хотим, чтобы get_data() начал выполняться только после того, как закончит свою работу sensor.
Например, в написание ETL мы использовали эту конструкцию:
extract >> transformation() >> load().
Сделали мы это для того, чтобы сначала выполнился extract, после него transformation и в самом конце load. Если бы мы это не прописали, то у airflow не было бы чёткого порядка выполнения тасок и из-за этого, они могли бы начать работать одновременно, что привело бы к ошибкам.Подробнее можно почитать тут.
Kahelman
Что это было?
nik190397
Гайд по возможному использованию эйрфлоу
Достаточно полный и полезный, потому что я, когда искал инфу насчет Connections в интерфейсе эйрфлоу, не смог найти ничего ни на русском, ни на английском, а тут есть скрины, которые показывают, что надо в качестве host указывать название контейнера, а не просто localhost. Если ты видел инфу об этом где-то хотя бы на английском, то супер и поделись, пожалуйста, ссылкой