Меня зовут Дмитрий и я вляпался в Airflow. Цель этой статьи — помочь начинающим пользователям Airflow ознакомиться с таблицами PostgreSQL. Время узнать насколько глубока аирфлоуольная нора.

Джун наблюдает за тем, как глубоко заберется мидл.
Джун наблюдает за тем, как глубоко заберется мидл.

Общая информация про Airflow

Airflow - это инструмент планирования и управления рабочими процессами, он использует базу данных для хранения метаданных о задачах, DAGs (Directed Acyclic Graphs), запусках и результатах.

База данных в Airflow:

  • SQLAlchemy: Airflow использует библиотеку SQLAlchemy для взаимодействия с различными системами управления базами данных;

  • SQLite: По умолчанию Airflow использует встроенную базу данных SQLite, которая хранится в файле airflow.db. Этот вариант подходит для маленьких проектов, но не рекомендуется для продуктовой среды;

  • Postgres, MySQL, MSSQL: Для более крупных проектов и продуктовых сред рекомендуется использовать реляционные базы данных (Postgres, MySQL, MSSQL).

Как было сказано выше основной инструмент это SQLAlchemy посредством которой проиcходит взаимодействие с БД. Рассказывать про то, как её развернуть и подключить я не буду, но есть замечательная статья https://habr.com/ru/articles/860900/. Упомяну, что настройка БД осуществляется в файле ~/airflow/airflow.cfg в строчке sql_alchemy_conn, где можно настроить подключение именно на постгрес. Сам постгрес настраивается отдельно.

Метаданные в Airflow:

  • Задачи: Информация о каждой задаче (имя, тип, зависимости, параметры и т.д.);

  • DAGs: Описание графика зависимостей между задачами;

  • Запуски: Информация о каждом запуске DAGs (дата и время запуска, статус и т.д.);

  • Результаты: Информация о результатах выполнения задач (успешно, с ошибкой, время выполнения и т.д.).

Таблицы и их содержимое

Далее коротко расскажу про некоторые таблицы и их поля.

Dag_code

  • fileloc_hash - хеш дага;

  • fileloc - расположение файла дага в системе;

  • last_updated - дата последнего обновления дага;

  • source_code - весь код дага с управляющими символами.

Из данной таблички можно вытянуть весь код DAG и сохранить в файл для переиспользования или корректировки, а также использовать в системе контроля версий.

Dag_pickle

  • id - идентификатор дага;

  • pickle - основное поле, хранит сериализованные данные дага;

  • created_dttm - информация о времени создания или обновления записи;

  • pickle_hash - хеш сериализованного объекта дага, позволяет Airflow быстро проверять изменился ли даг с последнего сохранения.

Отслеживать изменения дагов при их выполнении.

Log

  • id - идентификатор записи;

  • dttm - дата и время записи события;

  • dag_id - идентификатор DAG, к которому относится событие;

  • task_id - идентификатор задачи внутри DAG;

  • map_index - если задач внутри DAG несколько, то можно так получить инстансы задачи;

  • event - тип события (например, success, failed, skipped, и т.д.). Это более высокоуровневая информация, чем подробные сообщения в log.

  • execution_date - дата и время выполнения задачи;

  • owner - автор DAG или группа пользователей, отвечающих за него;

  • extra - дополнительная информация в формате JSON, может выводить автор DAG, хост, полный путь выполнения задачи и т.п.

По необходимости проанализировать события для конкретных пользователей или для определенных задач. Стоит так же отметить, что место хранения логов определяется в вышеупомянтом конфигурационном файле airflow.cfg в разделе [core], параметр base_log_folder(если у вас версия 2.0 и выше, то иначе – в разделе [logging]), и по умолчанию Airflow сохраняет логи задач в файлах на локальной файловой системе сервера Airflow. Поэтому если своевременно не ухаживать за своими логами, то есть все шансы потратить кучу времени на очистку в будущем. Либо использовать удаленное хранилище для логов. Но никто не запрещает вам испытать на себе роль Нео. Тук-тук-тук, так сказать.

Астанавитесь!
Астанавитесь!

Slot_pool

  • id: уникальный идентификатор записи в таблице;

  • pool: имя пула слотов. Это текстовая строка, которая используется для идентификации пула в Airflow;

  • slots: максимальное количество слотов, доступных в пуле. Это целое число, которое определяет, сколько задач могут выполняться одновременно из этого пула;

  • description: Дополнительное описание пула слотов (необязательное поле).

Из этой таблички мы можем узнать сколько можно в целом запускать DAGs одновременно. То есть таким образом мы можем распределять задачи в зависимости от потребностей и потребляемых ресурсов по разных пулам. Нет времени объяснять – просто запускай!

Connection

  • conn_id: уникальный идентификатор соединения;

  • conn_type: тип соединения. Это определяет, к какой системе подключается соединение Postgres, MySQL, S3, Hive, OpenAI Cloud Storage и т.п.;

  • description: описание соединения. Полезно для документирования и понимания назначения соединения;

  • host: хост или адрес сервера, к которому подключается соединение. Например, это может быть имя хоста базы данных, URL-адрес веб-сервиса или имя файла;

  • schema: схема (или база данных) в пределах сервера, если это применимо. Это поле используется, например, при работе с базами данных, такими как PostgreSQL или MySQL;

  • login: имя пользователя для аутентификации на сервере;

  • password: пароль для аутентификации на сервере. Важно: В производственных средах не рекомендуется хранить пароли в открытом виде в таблице;

  • port: порт сервера, к которому подключается соединение. Обычно используется для баз данных или других сетевых служб;

  • Если вы используете шифрование, то могут быть дополнительные поля is_encrypted и is_extra_encrypted. Первое указывает зашифрован ли пароль, а второе зашифрованы ли дополнительные параметры;

  • extra: дополнительные параметры соединения в формате JSON.

Таблица connections в Airflow хранит информацию о соединениях с внешними системами, которые используются DAGs для доступа к данным или выполнения других операций.

Dag

  • dag_id: уникальный идентификатор DAG. Да, это может быть и текст;

  • root_dag_id: идентификатор корневого DAG, если это под-DAG;

  • is_paused: DAG приостановлен? Если TRUE, DAG не будет запускаться автоматически;

  • is_subdag: DAG является под-DAGом?

  • is_active: DAG активен?

  • last_parsed_time: время последней успешной парсинга DAG-файла. Когда Airflow загружает DAG из файла, он записывает время в это поле;

  • last_pickled: сериализованный (pickled) объект DAG. Это позволяет Airflow быстро загружать DAG из базы данных, минуя повторный парсинг кода. В современных версиях Airflow это поле используется реже из-за перехода на более эффективные методы сериализации.

  • last_expired: время когда DAG получал сигнал об обновлении;

  • scheduler_lock: время установки блокировки планировщиком. Это предотвращает одновременный доступ к DAG нескольким планировщикам;

  • pickle_id: Идентификатор записи в таблице dag_pickle.

  • Fileloc: путь к файлу DAG в файловой системе;

  • processor_subdir: подкаталог для обработки DAG.

  • owners: владельцы DAG (обычно в формате JSON).

  • description: описание DAG.

  • default_view: предпочтительный способ просмотра DAG в интерфейсе Airflow.

  • schedule_interval: интервал планирования DAG (например, @daily, 0 0 * * *).

  • timetable_description: описание расписания запусков.

  • max_active_tasks: максимальное количество одновременно выполняемых задач в DAG.

  • max_active_runs: максимальное количество одновременно выполняемых запусков DAG.

  • has_task_concurrency_limits: есть ли ограничения на параллелизм задач?

  • has_import_errors: есть ли ошибки импорта в DAG-файле?

  • next_dagrun: планируемое время следующего запуска DAG.

  • next_dagrun_data_interval_start: начало интервала данных для следующего запуска.

  • next_dagrun_data_interval_end: конец интервала данных для следующего запуска.

  • next_dagrun_create_after: время, после которого можно создать следующий запуск DAG.

Таблица dag (или dag_model в более новых версиях Airflow) в Airflow хранит метаданные о ваших DAGs (Directed Acyclic Graphs). Это центральное хранилище информации о всех ваших DAG-файлах, их состоянии и истории выполнения.

Какая статья без кода. Посмотрим, сколько активных дагов, на паузе, активных на паузе, всего дагов, и какую долю составляют вышеупомянутые.

WITH dags AS (
    SELECT 
        (SELECT COUNT(dag_id) FROM dag WHERE is_active = True) AS active,
        (SELECT COUNT(dag_id) FROM dag WHERE is_paused = True) AS paused,
        (SELECT COUNT(dag_id) FROM dag WHERE is_active = True AND is_paused = True) AS a_n_p,
        (SELECT COUNT(dag_id) FROM dag) AS all_dags
)
SELECT 
    active, 
    paused, 
    a_n_p, 
    all_dags,
    CAST(active AS FLOAT) * 100 / all_dags AS active_ratio,
    CAST(paused AS FLOAT) * 100 / all_dags AS paused_ratio,
    CAST(a_n_p AS FLOAT) * 100  / all_dags AS a_n_p_ratio
FROM dags;

Вы сможете это протестить самостоятельно(при некоторых условиях), а у меня получилось следующее:

Активных

Паузанутых

Активно паузанутых

Всего

Доля активных

Доля паузанутых

Доля активно паузанутых

360

278

166

574

45%

35%

20%

И всё в целом хорошо, но как могут быть активно паузанутые?

А это вопрос для самоизучения. Мало ли на практике нужно будет проанализировать DAGs, а тут такая подстава.

Log_template

  • id: уникальный идентификатор записи лога;

  • filename: путь к файлу лога;

  • elasticsearch_id: ID записи в Elasticsearch (если используется Elasticsearch для хранения логов). Это поле будет NULL, если Elasticsearch не используется;

  • created_at: время создания записи лога.

Таблица log_template в Airflow используется для хранения шаблонов имен файлов логов. Эти шаблоны определяют, как Airflow формирует имена файлов для логов задач. Она не так часто используется напрямую разработчиками DAG, а больше управляется самой системой Airflow. Так же в таблице может присутствовать поле template, в котором мы можем задать шаблон для записи логов. Как и определить это самостоятельно новым классом/методом и т.п. Это довольно сложный и специфический процесс под конкретные цели и задачи, со своими плюсами и минусами. Главное - оно работает из коробки...

Ab_user

  • id: уникальный идентификатор пользователя;

  • first_name: имя пользователя;

  • last_name: фамилия пользователя;

  • username: имя пользователя для входа, иногда известно как логин, но это уже совсем другая история;

  • password: хешированный пароль;

  • active: указывает, активен ли аккаунт пользователя;

  • email: адрес электронной почты пользователя;

  • last_login: время последнего(если кому-то это очень важно, то крайнего) входа пользователя;

  • login_count: количество успешных входов пользователя;

  • fail_login_count: количество неудачных попыток входа пользователя;

  • created_on: время создания аккаунта пользователя;

  • changed_on: время последнего изменения данных пользователя;

  • created_by_fk: идентификатор пользователя, который создал данного;

  • changed_by_fk: идентификатор пользователя, который последний раз изменил данные этого пользователя.

Таблица ab_user содержит базовую информацию о пользователях Airflow, необходимую для управления доступом и аутентификации. В зависимости от вашей конфигурации, она может содержать дополнительные поля для более сложного управления доступом и пользовательскими профилями.

В этой статье мы рассмотрели структуру нескольких ключевых таблиц в базе данных Airflow, включая dag, connections, ab_user и таблицы логов. Понимание структуры этих таблиц необходимо для эффективного управления и мониторинга рабочих процессов в Airflow. Более подробную информацию можно найти в официальной документации Airflow.

Комментарии (0)