Меня зовут Дмитрий и я вляпался в Airflow. Цель этой статьи — помочь начинающим пользователям Airflow ознакомиться с таблицами PostgreSQL. Время узнать насколько глубока аирфлоуольная нора.
Общая информация про Airflow
Airflow - это инструмент планирования и управления рабочими процессами, он использует базу данных для хранения метаданных о задачах, DAGs (Directed Acyclic Graphs), запусках и результатах.
База данных в Airflow:
SQLAlchemy
: Airflow использует библиотеку SQLAlchemy для взаимодействия с различными системами управления базами данных;SQLite
: По умолчанию Airflow использует встроенную базу данных SQLite, которая хранится в файле airflow.db. Этот вариант подходит для маленьких проектов, но не рекомендуется для продуктовой среды;Postgres, MySQL, MSSQL
: Для более крупных проектов и продуктовых сред рекомендуется использовать реляционные базы данных (Postgres, MySQL, MSSQL).
Как было сказано выше основной инструмент это SQLAlchemy
посредством которой проиcходит взаимодействие с БД. Рассказывать про то, как её развернуть и подключить я не буду, но есть замечательная статья https://habr.com/ru/articles/860900/. Упомяну, что настройка БД осуществляется в файле ~/airflow/airflow.cfg
в строчке sql_alchemy_conn
, где можно настроить подключение именно на постгрес. Сам постгрес настраивается отдельно.
Метаданные в Airflow:
Задачи: Информация о каждой задаче (имя, тип, зависимости, параметры и т.д.);
DAGs: Описание графика зависимостей между задачами;
Запуски: Информация о каждом запуске DAGs (дата и время запуска, статус и т.д.);
Результаты: Информация о результатах выполнения задач (успешно, с ошибкой, время выполнения и т.д.).
Таблицы и их содержимое
Далее коротко расскажу про некоторые таблицы и их поля.
Dag_code
fileloc_hash
- хеш дага;fileloc
- расположение файла дага в системе;last_updated
- дата последнего обновления дага;source_code
- весь код дага с управляющими символами.
Из данной таблички можно вытянуть весь код DAG и сохранить в файл для переиспользования или корректировки, а также использовать в системе контроля версий.
Dag_pickle
id
- идентификатор дага;pickle
- основное поле, хранит сериализованные данные дага;created_dttm
- информация о времени создания или обновления записи;pickle_hash
- хеш сериализованного объекта дага, позволяет Airflow быстро проверять изменился ли даг с последнего сохранения.
Отслеживать изменения дагов при их выполнении.
Log
id
- идентификатор записи;dttm
- дата и время записи события;dag_id
- идентификатор DAG, к которому относится событие;task_id
- идентификатор задачи внутри DAG;map_index
- если задач внутри DAG несколько, то можно так получить инстансы задачи;event
- тип события (например, success, failed, skipped, и т.д.). Это более высокоуровневая информация, чем подробные сообщения в log.execution_date
- дата и время выполнения задачи;owner
- автор DAG или группа пользователей, отвечающих за него;extra
- дополнительная информация в формате JSON, может выводить автор DAG, хост, полный путь выполнения задачи и т.п.
По необходимости проанализировать события для конкретных пользователей или для определенных задач. Стоит так же отметить, что место хранения логов определяется в вышеупомянтом конфигурационном файле airflow.cfg
в разделе [core], параметр base_log_folder
(если у вас версия 2.0 и выше, то иначе – в разделе [logging]), и по умолчанию Airflow сохраняет логи задач в файлах на локальной файловой системе сервера Airflow. Поэтому если своевременно не ухаживать за своими логами, то есть все шансы потратить кучу времени на очистку в будущем. Либо использовать удаленное хранилище для логов. Но никто не запрещает вам испытать на себе роль Нео. Тук-тук-тук, так сказать.
Slot_pool
id
: уникальный идентификатор записи в таблице;pool
: имя пула слотов. Это текстовая строка, которая используется для идентификации пула в Airflow;slots
: максимальное количество слотов, доступных в пуле. Это целое число, которое определяет, сколько задач могут выполняться одновременно из этого пула;description
: Дополнительное описание пула слотов (необязательное поле).
Из этой таблички мы можем узнать сколько можно в целом запускать DAGs одновременно. То есть таким образом мы можем распределять задачи в зависимости от потребностей и потребляемых ресурсов по разных пулам. Нет времени объяснять – просто запускай!
Connection
conn_id
: уникальный идентификатор соединения;conn_type
: тип соединения. Это определяет, к какой системе подключается соединение Postgres, MySQL, S3, Hive, OpenAI Cloud Storage и т.п.;description
: описание соединения. Полезно для документирования и понимания назначения соединения;host
: хост или адрес сервера, к которому подключается соединение. Например, это может быть имя хоста базы данных, URL-адрес веб-сервиса или имя файла;schema
: схема (или база данных) в пределах сервера, если это применимо. Это поле используется, например, при работе с базами данных, такими как PostgreSQL или MySQL;login
: имя пользователя для аутентификации на сервере;password
: пароль для аутентификации на сервере. Важно: В производственных средах не рекомендуется хранить пароли в открытом виде в таблице;port
: порт сервера, к которому подключается соединение. Обычно используется для баз данных или других сетевых служб;Если вы используете шифрование, то могут быть дополнительные поля
is_encrypted
иis_extra_encrypted
. Первое указывает зашифрован ли пароль, а второе зашифрованы ли дополнительные параметры;extra
: дополнительные параметры соединения в формате JSON.
Таблица connections в Airflow хранит информацию о соединениях с внешними системами, которые используются DAGs для доступа к данным или выполнения других операций.
Dag
dag_id
: уникальный идентификатор DAG. Да, это может быть и текст;root_dag_id
: идентификатор корневого DAG, если это под-DAG;is_paused
: DAG приостановлен? ЕслиTRUE
, DAG не будет запускаться автоматически;is_subdag
: DAG является под-DAGом?is_active
: DAG активен?last_parsed_time
: время последней успешной парсинга DAG-файла. Когда Airflow загружает DAG из файла, он записывает время в это поле;last_pickled
: сериализованный (pickled) объект DAG. Это позволяет Airflow быстро загружать DAG из базы данных, минуя повторный парсинг кода. В современных версиях Airflow это поле используется реже из-за перехода на более эффективные методы сериализации.last_expired
: время когда DAG получал сигнал об обновлении;scheduler_lock
: время установки блокировки планировщиком. Это предотвращает одновременный доступ к DAG нескольким планировщикам;pickle_id
: Идентификатор записи в таблице dag_pickle.Fileloc
: путь к файлу DAG в файловой системе;processor_subdir
: подкаталог для обработки DAG.owners
: владельцы DAG (обычно в формате JSON).description
: описание DAG.default_view
: предпочтительный способ просмотра DAG в интерфейсе Airflow.schedule_interval
: интервал планирования DAG (например, @daily, 0 0 * * *).timetable_description
: описание расписания запусков.max_active_tasks
: максимальное количество одновременно выполняемых задач в DAG.max_active_runs
: максимальное количество одновременно выполняемых запусков DAG.has_task_concurrency_limits
: есть ли ограничения на параллелизм задач?has_import_errors
: есть ли ошибки импорта в DAG-файле?next_dagrun
: планируемое время следующего запуска DAG.next_dagrun_data_interval_start
: начало интервала данных для следующего запуска.next_dagrun_data_interval_end
: конец интервала данных для следующего запуска.next_dagrun_create_after
: время, после которого можно создать следующий запуск DAG.
Таблица dag
(или dag_model
в более новых версиях Airflow) в Airflow хранит метаданные о ваших DAGs (Directed Acyclic Graphs). Это центральное хранилище информации о всех ваших DAG-файлах, их состоянии и истории выполнения.
Какая статья без кода. Посмотрим, сколько активных дагов, на паузе, активных на паузе, всего дагов, и какую долю составляют вышеупомянутые.
WITH dags AS (
SELECT
(SELECT COUNT(dag_id) FROM dag WHERE is_active = True) AS active,
(SELECT COUNT(dag_id) FROM dag WHERE is_paused = True) AS paused,
(SELECT COUNT(dag_id) FROM dag WHERE is_active = True AND is_paused = True) AS a_n_p,
(SELECT COUNT(dag_id) FROM dag) AS all_dags
)
SELECT
active,
paused,
a_n_p,
all_dags,
CAST(active AS FLOAT) * 100 / all_dags AS active_ratio,
CAST(paused AS FLOAT) * 100 / all_dags AS paused_ratio,
CAST(a_n_p AS FLOAT) * 100 / all_dags AS a_n_p_ratio
FROM dags;
Вы сможете это протестить самостоятельно(при некоторых условиях), а у меня получилось следующее:
Активных |
Паузанутых |
Активно паузанутых |
Всего |
Доля активных |
Доля паузанутых |
Доля активно паузанутых |
360 |
278 |
166 |
574 |
45% |
35% |
20% |
И всё в целом хорошо, но как могут быть активно паузанутые?
А это вопрос для самоизучения. Мало ли на практике нужно будет проанализировать DAGs, а тут такая подстава.
Log_template
id
: уникальный идентификатор записи лога;filename
: путь к файлу лога;elasticsearch_id
: ID записи в Elasticsearch (если используется Elasticsearch для хранения логов). Это поле будет NULL, если Elasticsearch не используется;created_at
: время создания записи лога.
Таблица log_template
в Airflow используется для хранения шаблонов имен файлов логов. Эти шаблоны определяют, как Airflow формирует имена файлов для логов задач. Она не так часто используется напрямую разработчиками DAG, а больше управляется самой системой Airflow. Так же в таблице может присутствовать поле template, в котором мы можем задать шаблон для записи логов. Как и определить это самостоятельно новым классом/методом и т.п. Это довольно сложный и специфический процесс под конкретные цели и задачи, со своими плюсами и минусами. Главное - оно работает из коробки...
Ab_user
id
: уникальный идентификатор пользователя;first_name
: имя пользователя;last_name
: фамилия пользователя;username
: имя пользователя для входа, иногда известно как логин, но это уже совсем другая история;password
: хешированный пароль;active
: указывает, активен ли аккаунт пользователя;email
: адрес электронной почты пользователя;last_login
: время последнего(если кому-то это очень важно, то крайнего) входа пользователя;login_count
: количество успешных входов пользователя;fail_login_count
: количество неудачных попыток входа пользователя;created_on
: время создания аккаунта пользователя;changed_on
: время последнего изменения данных пользователя;created_by_fk
: идентификатор пользователя, который создал данного;changed_by_fk
: идентификатор пользователя, который последний раз изменил данные этого пользователя.
Таблица ab_user содержит базовую информацию о пользователях Airflow, необходимую для управления доступом и аутентификации. В зависимости от вашей конфигурации, она может содержать дополнительные поля для более сложного управления доступом и пользовательскими профилями.
В этой статье мы рассмотрели структуру нескольких ключевых таблиц в базе данных Airflow, включая dag, connections, ab_user и таблицы логов. Понимание структуры этих таблиц необходимо для эффективного управления и мониторинга рабочих процессов в Airflow. Более подробную информацию можно найти в официальной документации Airflow.