Меня зовут Дмитрий и я вляпался в Airflow. Цель этой статьи — помочь начинающим пользователям Airflow ознакомиться с таблицами PostgreSQL. Время узнать насколько глубока аирфлоуольная нора.

Общая информация про Airflow
Airflow - это инструмент планирования и управления рабочими процессами, он использует базу данных для хранения метаданных о задачах, DAGs (Directed Acyclic Graphs), запусках и результатах.
База данных в Airflow:
SQLAlchemy: Airflow использует библиотеку SQLAlchemy для взаимодействия с различными системами управления базами данных;SQLite: По умолчанию Airflow использует встроенную базу данных SQLite, которая хранится в файле airflow.db. Этот вариант подходит для маленьких проектов, но не рекомендуется для продуктовой среды;Postgres, MySQL, MSSQL: Для более крупных проектов и продуктовых сред рекомендуется использовать реляционные базы данных (Postgres, MySQL, MSSQL).
Как было сказано выше основной инструмент это SQLAlchemy посредством которой проиcходит взаимодействие с БД. Рассказывать про то, как её развернуть и подключить я не буду, но есть замечательная статья https://habr.com/ru/articles/860900/. Упомяну, что настройка БД осуществляется в файле ~/airflow/airflow.cfg в строчке sql_alchemy_conn, где можно настроить подключение именно на постгрес. Сам постгрес настраивается отдельно.
Метаданные в Airflow:
Задачи: Информация о каждой задаче (имя, тип, зависимости, параметры и т.д.);
DAGs: Описание графика зависимостей между задачами;
Запуски: Информация о каждом запуске DAGs (дата и время запуска, статус и т.д.);
Результаты: Информация о результатах выполнения задач (успешно, с ошибкой, время выполнения и т.д.).
Таблицы и их содержимое
Далее коротко расскажу про некоторые таблицы и их поля.
Dag_code
fileloc_hash- хеш дага;fileloc- расположение файла дага в системе;last_updated- дата последнего обновления дага;source_code- весь код дага с управляющими символами.
Из данной таблички можно вытянуть весь код DAG и сохранить в файл для переиспользования или корректировки, а также использовать в системе контроля версий.
Dag_pickle
id- идентификатор дага;pickle- основное поле, хранит сериализованные данные дага;created_dttm- информация о времени создания или обновления записи;pickle_hash- хеш сериализованного объекта дага, позволяет Airflow быстро проверять изменился ли даг с последнего сохранения.
Отслеживать изменения дагов при их выполнении.
Log
id- идентификатор записи;dttm- дата и время записи события;dag_id- идентификатор DAG, к которому относится событие;task_id- идентификатор задачи внутри DAG;map_index- если задач внутри DAG несколько, то можно так получить инстансы задачи;event- тип события (например, success, failed, skipped, и т.д.). Это более высокоуровневая информация, чем подробные сообщения в log.execution_date- дата и время выполнения задачи;owner- автор DAG или группа пользователей, отвечающих за него;extra- дополнительная информация в формате JSON, может выводить автор DAG, хост, полный путь выполнения задачи и т.п.
По необходимости проанализировать события для конкретных пользователей или для определенных задач. Стоит так же отметить, что место хранения логов определяется в вышеупомянтом конфигурационном файле airflow.cfg в разделе [core], параметр base_log_folder(если у вас версия 2.0 и выше, то иначе – в разделе [logging]), и по умолчанию Airflow сохраняет логи задач в файлах на локальной файловой системе сервера Airflow. Поэтому если своевременно не ухаживать за своими логами, то есть все шансы потратить кучу времени на очистку в будущем. Либо использовать удаленное хранилище для логов. Но никто не запрещает вам испытать на себе роль Нео. Тук-тук-тук, так сказать.

Slot_pool
id: уникальный идентификатор записи в таблице;pool: имя пула слотов. Это текстовая строка, которая используется для идентификации пула в Airflow;slots: максимальное количество слотов, доступных в пуле. Это целое число, которое определяет, сколько задач могут выполняться одновременно из этого пула;description: Дополнительное описание пула слотов (необязательное поле).
Из этой таблички мы можем узнать сколько можно в целом запускать DAGs одновременно. То есть таким образом мы можем распределять задачи в зависимости от потребностей и потребляемых ресурсов по разных пулам. Нет времени объяснять – просто запускай!
Connection
conn_id: уникальный идентификатор соединения;conn_type: тип соединения. Это определяет, к какой системе подключается соединение Postgres, MySQL, S3, Hive, OpenAI Cloud Storage и т.п.;description: описание соединения. Полезно для документирования и понимания назначения соединения;host: хост или адрес сервера, к которому подключается соединение. Например, это может быть имя хоста базы данных, URL-адрес веб-сервиса или имя файла;schema: схема (или база данных) в пределах сервера, если это применимо. Это поле используется, например, при работе с базами данных, такими как PostgreSQL или MySQL;login: имя пользователя для аутентификации на сервере;password: пароль для аутентификации на сервере. Важно: В производственных средах не рекомендуется хранить пароли в открытом виде в таблице;port: порт сервера, к которому подключается соединение. Обычно используется для баз данных или других сетевых служб;Если вы используете шифрование, то могут быть дополнительные поля
is_encryptedиis_extra_encrypted. Первое указывает зашифрован ли пароль, а второе зашифрованы ли дополнительные параметры;extra: дополнительные параметры соединения в формате JSON.
Таблица connections в Airflow хранит информацию о соединениях с внешними системами, которые используются DAGs для доступа к данным или выполнения других операций.
Dag
dag_id: уникальный идентификатор DAG. Да, это может быть и текст;root_dag_id: идентификатор корневого DAG, если это под-DAG;is_paused: DAG приостановлен? ЕслиTRUE, DAG не будет запускаться автоматически;is_subdag: DAG является под-DAGом?is_active: DAG активен?last_parsed_time: время последней успешной парсинга DAG-файла. Когда Airflow загружает DAG из файла, он записывает время в это поле;last_pickled: сериализованный (pickled) объект DAG. Это позволяет Airflow быстро загружать DAG из базы данных, минуя повторный парсинг кода. В современных версиях Airflow это поле используется реже из-за перехода на более эффективные методы сериализации.last_expired: время когда DAG получал сигнал об обновлении;scheduler_lock: время установки блокировки планировщиком. Это предотвращает одновременный доступ к DAG нескольким планировщикам;pickle_id: Идентификатор записи в таблице dag_pickle.Fileloc: путь к файлу DAG в файловой системе;processor_subdir: подкаталог для обработки DAG.owners: владельцы DAG (обычно в формате JSON).description: описание DAG.default_view: предпочтительный способ просмотра DAG в интерфейсе Airflow.schedule_interval: интервал планирования DAG (например, @daily, 0 0 * * *).timetable_description: описание расписания запусков.max_active_tasks: максимальное количество одновременно выполняемых задач в DAG.max_active_runs: максимальное количество одновременно выполняемых запусков DAG.has_task_concurrency_limits: есть ли ограничения на параллелизм задач?has_import_errors: есть ли ошибки импорта в DAG-файле?next_dagrun: планируемое время следующего запуска DAG.next_dagrun_data_interval_start: начало интервала данных для следующего запуска.next_dagrun_data_interval_end: конец интервала данных для следующего запуска.next_dagrun_create_after: время, после которого можно создать следующий запуск DAG.
Таблица dag (или dag_model в более новых версиях Airflow) в Airflow хранит метаданные о ваших DAGs (Directed Acyclic Graphs). Это центральное хранилище информации о всех ваших DAG-файлах, их состоянии и истории выполнения.
Какая статья без кода. Посмотрим, сколько активных дагов, на паузе, активных на паузе, всего дагов, и какую долю составляют вышеупомянутые.
WITH dags AS (
SELECT
(SELECT COUNT(dag_id) FROM dag WHERE is_active = True) AS active,
(SELECT COUNT(dag_id) FROM dag WHERE is_paused = True) AS paused,
(SELECT COUNT(dag_id) FROM dag WHERE is_active = True AND is_paused = True) AS a_n_p,
(SELECT COUNT(dag_id) FROM dag) AS all_dags
)
SELECT
active,
paused,
a_n_p,
all_dags,
CAST(active AS FLOAT) * 100 / all_dags AS active_ratio,
CAST(paused AS FLOAT) * 100 / all_dags AS paused_ratio,
CAST(a_n_p AS FLOAT) * 100 / all_dags AS a_n_p_ratio
FROM dags;
Вы сможете это протестить самостоятельно(при некоторых условиях), а у меня получилось следующее:
Активных |
Паузанутых |
Активно паузанутых |
Всего |
Доля активных |
Доля паузанутых |
Доля активно паузанутых |
360 |
278 |
166 |
574 |
45% |
35% |
20% |

И всё в целом хорошо, но как могут быть активно паузанутые?
А это вопрос для самоизучения. Мало ли на практике нужно будет проанализировать DAGs, а тут такая подстава.
Log_template
id: уникальный идентификатор записи лога;filename: путь к файлу лога;elasticsearch_id: ID записи в Elasticsearch (если используется Elasticsearch для хранения логов). Это поле будет NULL, если Elasticsearch не используется;created_at: время создания записи лога.
Таблица log_template в Airflow используется для хранения шаблонов имен файлов логов. Эти шаблоны определяют, как Airflow формирует имена файлов для логов задач. Она не так часто используется напрямую разработчиками DAG, а больше управляется самой системой Airflow. Так же в таблице может присутствовать поле template, в котором мы можем задать шаблон для записи логов. Как и определить это самостоятельно новым классом/методом и т.п. Это довольно сложный и специфический процесс под конкретные цели и задачи, со своими плюсами и минусами. Главное - оно работает из коробки...

Ab_user
id: уникальный идентификатор пользователя;first_name: имя пользователя;last_name: фамилия пользователя;username: имя пользователя для входа, иногда известно как логин, но это уже совсем другая история;password: хешированный пароль;active: указывает, активен ли аккаунт пользователя;email: адрес электронной почты пользователя;last_login: время последнего(если кому-то это очень важно, то крайнего) входа пользователя;login_count: количество успешных входов пользователя;fail_login_count: количество неудачных попыток входа пользователя;created_on: время создания аккаунта пользователя;changed_on: время последнего изменения данных пользователя;created_by_fk: идентификатор пользователя, который создал данного;changed_by_fk: идентификатор пользователя, который последний раз изменил данные этого пользователя.
Таблица ab_user содержит базовую информацию о пользователях Airflow, необходимую для управления доступом и аутентификации. В зависимости от вашей конфигурации, она может содержать дополнительные поля для более сложного управления доступом и пользовательскими профилями.
В этой статье мы рассмотрели структуру нескольких ключевых таблиц в базе данных Airflow, включая dag, connections, ab_user и таблицы логов. Понимание структуры этих таблиц необходимо для эффективного управления и мониторинга рабочих процессов в Airflow. Более подробную информацию можно найти в официальной документации Airflow.