Небольшое отступление
Работая работу, вдруг, появилась необходимость познакомиться с таким инструментом, как Apache Airflow. Задачу дали простую - нет никаких уведомлений в DAG'ах, при сбое необходимо уведомлять. Так как про этот сервис я только "слышал", уверенных знаний я показать даже сейчас, боюсь, не смогу. Зато смогу поделиться с вами простым кодом оповещения, который поможет вам не придумывать велосипед и воспользоваться (а то и улучшить) текущим. За основу я взял статью на Хабре, само собой официальная документация и другие открытые источники. Так же отдельное спасибо моему наставнику, который ревьювил всю работу.

Данный alerting предназначен для, скорее всего, для любых версий, но если вы хотите использовать преимущества Apache Airflow на полную, то при версии >= 2.6.0 рекомендуется читать статью выше.

Для аннотаций типов используется пакет typing (as tp). Для отправки уведомлений в telegram вам понадобится установить пакет python-telegram-bot .

Если вы здесь, скорее всего, вы уже примерно представляете что такое Apache Airflow, таски (джобы) и даги, поэтому не буду вдаваться в подробности и сразу перейду к сути.
Для начала я советую вам повторить (а в будущем и превзойти, если нужно!) текущую конфигурацию.

Для того чтобы понять работают ли уведомления напишем простейший dag со сломанной таской:

def failing_task():
    raise Exception("Пример ошибки")


with DAG(
    "telegram_notification_dag",
    default_args={
        "on_failure_callback": #что-то должно вызываться при сломанной таске
    },
    description="Отправка уведомлений через Telegram бот",
    schedule_interval="@daily",
    start_date=datetime(2022, 1, 1),
    catchup=False,
) as dag:

    failing_task = PythonOperator(
        task_id="failing_task", python_callable=failing_task, dag=dag
    )

    (failing_task)

Для людей, которым нужно только уведомление, но никто не хочет в этом разбираться поясню: DAG - просто структура, которая определяет порядок выполнения задач, их взаимодействие и последовательности. Task (job) - это задание или шаг в рамках DAG, которое выполняет какое-либо действие.

В наш DAG передали его идентификатор, словарь аргументов (все аргументы можно посмотреть а официальной доке), интервал запуска и т.д.

Далее определим нужные нам таски, нам нужен только один (который будет сломан).

Существует несколько операторов для выполнения задач, в нашем случае используется PythonOperator, который будет вызывать (какой-то) python код.

Далее не буду описывать всё по шагам и просто скопирую и объясню. Класс TelegramNotification отвечает за отправку сообщения пользователю.

class TelegramNotification:
    """intervals - интервалы переотправки оповещения, 
    если не получается отправить оповещение"""

    def __init__(
        self,
        chat_id: str,
        token: str,
        message_template: str,
        responsible_users: tp.List[str] = [],
        intervals: tp.List[int] = [1, 60, 600],
    ):
        self._chat_id = chat_id
        self._messageTemplate = MessageTemplate(message_template, responsible_users)
        self._intervals = intervals
        self._token = token

    def send_telegram_notification(self, context: tp.Dict[str, tp.Any]) -> None:

        message = self._messageTemplate.create_message_template(context)

        for interval in self._intervals:
            try:
                bot = Bot(token=self._token)
                bot.send_message(chat_id=self._chat_id, text=message)
                break
            except Exception as e:
                logger.info(f"Error sending message to Telegram: {e}")
                time.sleep(interval)

Отправка сообщения боту происходит в цикле конструкции try except, из всего, что связано с телеграмом, тут только создание бота и далее строчка с вызовом метода send_message у нашего созданного бота. Почему всё это обёрнуто непонятно как будет объяснено в конце статьи. Наше сообщение должно формироваться в зависимости от того, что нужно человеку, т.к. это противоречит цели этого класса, а у каждого класса своя зона ответственности! То сам модуль состоит из ещё одного класса.

class MessageTemplate:
    def __init__(self, message_template: str, responsible_users: tp.List[str]):
        self._message_template = message_template
        self._responsible_users = responsible_users

    def create_message_template(self, context: tp.Dict[str, tp.Any]) -> str:
        args = self._parse_context(context)
        message = self._message_template.format(**args)
        return message

    def _parse_context(self, context: tp.Dict[str, tp.Any]) -> tp.Dict[str, tp.Any]:
        """Доступные аргументы для message template
        DAG_NAME: название DAG
        TASK_ID: название задачи
        DATE: дата и время выполнения задачи
        TASK_LOG_URL: ссылка на лог выполнения задачи
        PARAMS: параметры, переданные в задачу
        CONF: глобальные параметры, переданные в DAG при его запуске
        PREV_EXEC_DATE: дата и время выполнения предыдущей задачи
        USERS: список пользоватлей, ответственных за выполнение задачи
        """
        return {
            "DAG_NAME": context.get("dag").dag_id,
            "TASK_ID": context.get("task_instance").task_id,
            "DATE": self._create_formatted_date(context.get("execution_date")),
            "TASK_LOG_URL": context.get("ti").log_url,
            "PARAMS": context.get("params"),
            "CONF": context.get("conf"),
            "PREV_EXEC_DATE": self._create_formatted_date(
                context.get("prev_execution_date")
            ),
            "USERS": self._create_users_string(),
        }

    def _create_formatted_date(self, date: datetime) -> str:
        return date.strftime("%Y-%m-%d %H:%M:%S") if date else ""

    def _create_users_string(self) -> str:
        return ", ".join([f"@{user_name}" for user_name in self._responsible_users])

Из всего, что здесь есть, думаю, стоит упомянуть объект context, который передаётся в callback функцию при успешном (можете сами дописать в класс функцию, которая отправляет что-то при success) или не успешном выполнении задачи. Этот объект - словарь, из которого можно вытянуть практически всю информацию, которая вам нужна. Аргументы в данном примере - не все. Их как минимум в два раза больше, если вам нужно что-то дополнительно, вы с лёгкостью сможете внедрить это сюда.

Теперь посмотрим, как же с этим работать. Далее весь код для нашего тестового DAG'а:

import time

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from loguru import logger

from dependencies.tg_notification import TelegramNotification, read_bot_secrets

PATH_TO_SECRETS = "путь_к_секретам"
SECRETS = read_bot_secrets(PATH_TO_SECRETS)

CHAT_ID_KEY = "chat_id"
TOKEN_KEY = "token"

NOTIFY_MESSAGE = """
Идентификатор DAG: {DAG_NAME}. 
Идентификатор задачи: ❌{TASK_ID}❌. 
Дата и время выполнения задачи: {DATE}. 
Ответственные лица: {USERS}
"""


telegram_notification = TelegramNotification(
    chat_id=SECRETS.get(CHAT_ID_KEY),
    token=SECRETS.get(TOKEN_KEY),
    message_template=NOTIFY_MESSAGE,
    responsible_users=[
        "your_username", 
    ],
    intervals=[1, 100, 1000],
)


def failing_task():
    raise Exception("Пример ошибки")


with DAG(
    "telegram_notification_dag",
    default_args={
        "on_failure_callback": telegram_notification.send_telegram_notification
    },
    description="Отправка уведомлений через Telegram бот",
    schedule_interval="@daily",
    start_date=datetime(2022, 1, 1),
    catchup=False,
) as dag:

    failing_task = PythonOperator(
        task_id="failing_task", python_callable=failing_task, dag=dag
    )

    (failing_task)

Конечно же, последуют объяснения. Для начала прочитаем наши секреты. Функция для чтения (read_bot_secrets) будет показана чуть ниже, она играет довольно важную роль для нас, чтобы всё не сломалось.

Так же создадим шаблон сообщения. Код с доступными аргументами (MessageTemplate) находится выше и вы можете составить какой только захотите.

Создаём объект класса TelegramNotification и передаём туда токен нашего бота, наш id чата, шаблон сообщения, а так же ответственных лиц для нужного вам DAG'а (без @).

Вся магия происходит вот тут:

"on_failure_callback": telegram_notification.send_telegram_notification

При сбое в таске вызывается функция send_telegram_notification, куда передаётся Airflow объект context, в котором уже содержится вся нужная нам информация.

Что же произойдёт, если у нас не получилось прочитать данные токена или id чата, или вообще не получилось прочитать наши секреты? Вернётся пустой словарь или словарь с нужными (правильными или нет) значениями. Так как в данном коде используется метод get в получении значений (token и chat_id), то если какого-либо ключа не будет, то нам просто вернётся None. В конце функции send_telegram_notification класса TelegramNotification так же используется конструкция try except, это гарантирует, что если у нас будут проблемы с сетью, или у нас не будет каких либо значений, или они будут неправильные - в логи выведется ошибка и мы сможем уже работать с ней дальше.

В моём случае я использую рабочий Apache Airflow, из всей доступной мне информации могу сказать, что стоит версия 2.5.3.

Результаты:

На этом у меня всё. Задавайте свои вопросы, пишите комментарии, оставляйте свои замечания. Надеюсь, моя статья хоть как-то помогла вам.

Комментарии (3)


  1. trabl
    07.07.2024 13:15

    Я сделал по упомянутой ссылке в статье оповещения, в сообщении так же присутствует ссылка на лог заваленного таска, что очень удобно, так как можно одним кликом перейти и посмотреть ошибку. Так же у меня есть дашборд в grafana, где видно в деталях количество успешных джоб и не успешных, и ещё куча полезной инфы. Я бы хотел увидеть как без дополнительной кастомизации настроить алертинг для airflow с помощью statsd_exporter и alertmanager/Prometheus. У меня так и не получилось выводить в сообщениях например ту же ссылку на лог, поэтому пошёл другим путём.


  1. tarasovlad
    07.07.2024 13:15

    Но вроде в Airflow есть импортируемый TelegramOperator https://airflow.apache.org/docs/apache-airflow-providers-telegram/stable/operators.html


    1. goraffin Автор
      07.07.2024 13:15

      Привет! Спасибо за комментарий! Я такой возможности не нашёл во второй версии. Возможно, плохо искал. Я считаю, что в любом случае сделал, как будто бы, более настраиваемый alerting. Попробую протестировать на 2 версии, спасибо, что поделился!