Привет, Хабр! Мы, Юлия Лузганова HiJulia и Наталия Прудникова balzaant, аналитики в команде Business Intelligence Delivery Club. Наш департамент аналитики стремительно вырос за последние полтора года, сейчас в нем 50 человек и десятки различных проектов. Мы в группе BI-аналитики помогаем пользователям получать чистые и актуальные данные. Например, количество заказов, работающие рестораны и время доставки заказов — одни из главных сущностей. Наша основная задача — своевременная и бесперебойная поставка данных в аналитическое хранилище и их подготовка к дальнейшему использованию. Для этого нам необходимо оперативно выявлять проблемы с загрузкой и обработкой информации.

В этой статье мы хотели бы рассказать о создании мониторинга и системы “near real-time” оповещений. С технической точки зрения реализация простая, а вот нервных клеток разработчиков DWH, BI и пользователей после внедрения сохранено бесконечно много.

Почему мы вообще потратили время на создание системы оповещений для команды аналитики? Все просто: нам хотелось меньше заниматься поддержкой, а точнее, мы стремились минимизировать ручной мониторинг загрузки данных, состояния базы и отправки отчетов, чтобы автоматически оповещать пользователей базы данных об изменениях, неоптимальных запросах или активно растущих в объемах таблицах.

В статье вы найдете заметки, которые помогут вам реализовать подобную систему, а также идеи для автоматических оповещений. Также расскажем, как используя логи задач и немного статистики, мы выявляем аномалии в работе процессов.

Сразу к делу, или Как все устроено


Описанную систему оповещений можно переложить практически на любую инфраструктуру. Что касается стека проекта, то на момент написания статьи это база данных PostgreSQL, ClickHouse, Jenkins в качестве планировщика и Tableau как BI-инструмент. Сейчас мы переезжаем на новую инфраструктуру, в основе которой лежит Hadoop, и система оповещений будет развернута там с некоторыми изменениями. Мы обязательно расскажем об этом в блоге Delivery Club, поэтому подписывайтесь.

В качестве канала связи для оповещений мы выбрали Telegram, так как практика показала, что оперативнее всего команды реагируют именно там. Но также мы запускали оповещения в Slack, по почте, исследовали возможность реализации в корпоративном мессенджере Myteam — везде можно было внедрить это решение.

Схема реализации представлена на рисунке: приложение состоит из агентов, собирающих оповещения и сохраняющих их в управляющую таблицу DWH, и собственного Telegram-бота, отправляющего оповещения в чаты Telegram. Для более гибких настроек используется конфигурационный файл.



Агенты — это отдельные скрипты, которые собирают логи для отправки оповещений. Агенты написаны на SQL и Python. Результатом их работы являются записи оповещений в управляющей таблице в DWH. В строке, которую передает агент, есть все необходимое для отправки: сообщение, каналы, для которых оно предназначено, ключ сообщения (при его наличии, чтобы избежать повтора оповещения), флаг решения проблемы.

Управляющая таблица DWH — это место хранения логов для отправки, которые пишут агенты. Разработчик всегда может обратиться к этой таблице, чтобы понять природу оповещения: что за агент записал оповещение, какое пороговое значение превышено, кому предназначалось оповещение и во сколько оно было отправлено.

Telegram-бот на схеме — это самописный Python-скрипт с библиотекой python-telegram-bot для управления ботом. Каждые пять минут он обращается к таблице с логами и проверяет наличие новых строк-оповещений. Если новая строка найдена, то бот отправляет сообщение с указанными параметрами в строке. После отправки он проставляет в управляющей таблице флаг is_sent = 1.

Как сделать своего бота для отправки сообщений
  1. В Telegram зарегистрируйте нового бота, отправив в @BotFather команду /newbot. Далее придумайте название бота и уникальный username. В результате вы получите секретный токен для управления через API.
  2. Создайте группу в Telegram, добавьте в участники бота и команду. В настройках канала сделайте его приватным, а всем участникам ограничьте права на отправку сообщений в эту группу. Важно оставить боту возможность отправлять сообщения.
  3. С помощью команды https://api.telegram.org/bot<ваш token>h/getUpdates узнайте chat_id для отправки сообщений.
  4. Отправка сообщений с помощью библиотеки python-telegram-bot упрощенно выглядит так:

import telegram 
bot = telegram.Bot(token)
bot.send_message(chat_id, message)

где chat_id — это номер чата из пункта 3, а message — тело сообщения.

Полезные ссылки: Telegram Bot API, python-telegram-bot.

Как правило, единый чат для оповещений на 100+ человек — это нерабочий вариант с точки зрения потребителей этих оповещений. В какой-то момент сообщений становится слишком много, пользователи не читают канал или вообще архивируют его. Поэтому мы сделали сегментированные каналы для различных групп пользователей: разработчиков, аналитиков, продуктовой команды.

При реализации мы поняли, что нам нужен более гибкий инструмент конфигурации. Например, чтобы отключать оповещения для конкретной задачи или вручную изменить владельца объекта для агента. В качестве быстрого решения могут быть предложены Google-таблицы. Использовать их необязательно и при необходимости можно заменить на любой пользовательский интерфейс, интегрированный с хранилищем данных.

Типы оповещений от агентов


Пользовательские процессы в базе данных


Оповещение о недоступности данных к ожидаемому времени. Мы ушли от ручного оповещения. Если к определенному с бизнесом времени данные загружены не полностью или есть ошибки в расчетах, то агент автоматически передаст сообщение для канала аналитики. После загрузки всех данных агент также проинформирует об этом.

Оповещение о росте размера таблиц в DWH. Структура команд подразумевает наличие доступа в DWH у всех членов команды аналитики. Есть отдельная схема-песочница, где каждый может создавать любые сущности и производить вычисления. Мы создали агента, который следит за размерами таблиц на SSD и оповещает владельцев слишком больших таблиц. В этом случае пороговые значения установлены по своему усмотрению.

Оповещение о блокирующих процессах DWH. Опытным путем мы обнаружили, что нередко пользователи могут заблокировать своим запросом другие процессы. В некоторых случаях это критично, так как влияет на обновление данных. Агент отслеживает блокировки длительнее 30 минут и информирует об этом пользователей.

Оповещения о долгих пользовательских запросах в DWH. Мы установили такие пороговые значения: в дневное время пользовательский SQL-запрос не должен превышать 2 часа, а в ночное время — 30 минут. Если запрос длится дольше, то агент его принудительно остановит и оповестит владельца запроса.

Информационный блок


Оповещения об изменениях в DWH. Агент записывает в хранилище информацию обо всех изменениях: интегрировании новых источников данных, удалении или добавлении таблиц и колонок.

Продуктовый блок


Оповещения об аномалиях в продукте. Агент отлавливает аномалии в приложении под iOS и Android в реальном времени с помощью составных KPI. Примеры KPI: доля пользователей, обратившихся в поддержку после создания заказа за последний час, или доля пустых выдач поиска за последний час. Эти оповещения помогают команде в кратчайшие сроки реагировать на потенциальные неполадки.

Эксплуатация и мониторинг


Оповещения о неуспешном завершении задач. Если процесс завершился со статусом “failure” или “aborted”, то соответствующая запись будет передана в управляющую таблицу для последующей отправки оповещения. Агент собирает логи задач через API Jenkins.

Оповещения о неполноте данных. Агент отвечает за проверку полноты ключевых данных в хранилище. Заранее определяются KPI для каждой области, и если они не соответствуют пороговым значениям, то сообщение будет передано в канал BI. В первую очередь, это сигнал, что данные загружены не в полном объеме или имеются логические ошибки в расчетах.

Оповещение о свободном месте в DWH. Так как бизнес Delivery Club быстро растет и расширяется, объем данных значительно увеличивается ежедневно. На текущий момент мы в процессе переезда на новую инфраструктуру, но пока требуется мониторинг свободного места. Оповещение предусмотрены, если на HDD или SSD значительно сократилось место и требуется вовлечение команды BI.

Оповещения об аномальном времени выполнения задач. Если задача работает дольше обычного, то отправляется сообщение на команду BI. Агент обращается к API Jenkins, чтобы получить текущие состояния работающих процессов, затем разработанная модель обрабатывает полученные данные на предмет аномалий.

Модель по выявлению аномалий в работе процессов — это нетривиальная задача, ведь продолжительность работы каждой задачи зависит от множества факторов и может варьироваться ежедневно. Далее расскажем, как мы реализовали подобную модель, динамически рассчитывая допустимый интервал для каждого процесса.

Ловим аномалии на лету




Ежедневно наш планировщик Jenkins запускает около 700 различных задач. За сутки выполняется более 10 тысяч сборок (“сборкой” в контексте статьи мы называем каждый отдельный запуск задачи планировщиком). Конечно, в его интерфейсе есть инструменты, позволяющие мониторить процесс сборки задач без доработок, но для этого нужно посадить отдельного оператора Jenkins (и не одного, ведь мониторинг нам нужен круглосуточно).

Мы хотели получить инструмент, позволяющий автоматически отслеживать все сбои и аномалии в наших процессах с минимальным участием человека. На рынке есть готовые решения для мониторинга процессов Jenkins (например, Datadog plugin), но они в основном являются платными и не покрывают всех необходимых для нас задач, в том числе по настройке оповещений. Кроме того, у нас уже были наработки по получению данных из Jenkins. Поэтому решили создать собственный мониторинг на стороне аналитического DWH.

С чего все началось


Ранее мы разработали процесс регулярной загрузки через Python API Jenkins в DWH следующих данных:

  • список существующих задач;
  • информацию об изменениях в конфигурациях задач;
  • время запуска и статус завершения всех сборок.

Эта информация собирается в нашем хранилище и используется для анализа и разрешения инцидентов.

Выше описано, как мы наладили мониторинг результатов сборок и стали узнавать о падениях в реальном времени с помощью системы оповещений в нашем Telegram-канале. Это сильно ускорило реагирование на проблемы, но подходит только для завершившихся сборок. А ведь часто возникают ситуации, когда нам нужно узнавать о возникших проблемах еще до того, как задача упадет:

  • Блокировка процессов. Данные загружаются в DWH из десятка сервисов с различной частотой и расписанием. Параллельно собираются слои DWH и витрины данных. В итоге мы получаем много одновременных процессов, и часть из них конкурирует между собой. Немного не уследили за расписанием, — и задачи уже блокируют другу друга, таблицы не формируются вовремя, а пользователи данных грустят.
  • Зависания задач. Сбои в подключениях, проблемы в источниках данных — и в результате задача может зависнуть на неопределенное время. Мы не получаем сообщение о проблеме (ведь процесс не упал). А данных нет.
  • Постоянное обновление задач. Запуск новых или изменения в существующих задачах иногда приводят к резкому замедлению выполнения других процессов. Конечно, мы стараемся отлавливать такие проблемы на этапе проверки кода, но иногда не удается сразу учесть влияние изменений на все остальные задачи. Особенно если изменения касаются настроек задачи, например, расписания сборок.

Если вовремя не отслеживать подобные проблемы, то они накапливаются, приводят к серьезным сбоям и невозможности оперативно получить критически важные данные. Можно было бы решить вопрос, поставив для всех задач общее ограничение по времени работы, например, три часа. Но такой подход выглядит сомнительным, потому что с учетом роста обрабатываемых данных маловероятно подобрать разумное единое ограничение для всех задач и актуализировать его со временем. В перспективе это создаст много ручной работы и может подвести в самый неподходящий момент.

В итоге мы пришли к динамическому расчету допустимой длительности выполнения каждой задачи. Мы регулярно в реальном времени отслеживаем время работы запущенных сборок и оповещаем в случае превышения верхней границы. Как мы это реализовали и о какие подводные камни споткнулись, читайте дальше.

Подход к решению


Мы определились, что модуль мониторинга продолжительности сборок должен быть прост в создании и поддержке. С учетом текущего стека и экспертизы решили разрабатывать на Python + PostgreSQL. Для выявления аномалий в длительности задач использовали медианное абсолютное отклонение (MAD). Этот подход аналогичен методу трех сигм, но последний предполагает нормальное распределение данных, и в результате нам пришлось от него отказаться. Кроме того, нам требовалась устойчивость к выбросам, поэтому остановились на использовании медианы. Такой подход к выявлению аномалий подробно описан в статье Leys, C., et al., Detecting outliers: Do not use standard deviation around the mean, use absolute deviation around the median.

Подробнее про MAD
Медианное абсолютное отклонение — это устойчивая мера рассеяния данных. Получить ее значение можно рассчитав медиану выборки и вычислив расхождения между каждым значением и медианой. MAD является медианой полученных расхождений:

$MAD = Median(|Xi - Median(X)|)$


MAD похоже на стандартное отклонение, но поскольку медиана меньше зависит от крайних значений, MAD более устойчиво к выбросам.

Как работает модуль


1. Расчет интервала

Выделили список задач, которые попадут под мониторинг.
Для начала мы включили все критически важные процессы загрузки данных, без которых встанет работа отдела, и исключили технические задачи, которые не влияют на остальные процессы. А остальные отфильтровали по параметрам:

  • количество успешных сборок больше 30 — для отсечения новых задач, у которых накоплено недостаточно информации для корректного расчета;
  • среднее время работы успешных сборок больше одной минуты — потому что задачи с очень маленьким временем работы слишком чувствительны;
  • последний успешный запуск был в течение 90 предыдущих дней — убрали неактуальные задачи.

Далее для каждой задачи мы рассматриваем заданное число последних успешных сборок. Это число управляется параметром job_quantity (по дефолту берем его равным 30). Вычисляем медиану продолжительности выбранных сборок и рассчитываем для каждой сборки отклонение медианы от абсолютного значения. После этого мы вычисляем медиану всех отклонений и получаем показатель MAD. Таким образом, для половины всех сборок продолжительность должна находиться в пределах одного MAD от медианы в обоих направлениях. Для анализа выбросов сборок устанавливается коэффициент outlier_mult, который определяет сколько MAD от медианы отступить для границ допустимого интервала. Коэффициент outlier_mult обычно берут равным 2, 2.5, 3 в зависимости от необходимой точности.

В нашей системе параметры job_quantity и outlier_mult управляются через конфиг с учетом специфики задачи, времени работы и частоты запусков. Пример расчета верхней границы допустимого интервала для одной задачи на основе тридцати сборок показан ниже.

# Получаем датафрейм с информацией о сборках из DWH
import pandas as pd
from sqlalchemy import create_engine, text
 
job_quantity = 30
psql_engine = create_engine('адрес базы')
query = '''sql запрос к базе, получающий данные о времени запуска и продолжительности тридцати последних сборок'''
 
job_stata = pd.read_sql_query(text(query), con=psql_engine)
 
# Вычисление медианы продолжительности сборок
med_duration = job_stata['s_build_duration'].median(axis = 0, skipna = True)
# Вычисление отклонений реальных значений от медианы
job_stata['dev_median'] = abs(job_stata['s_build_duration'] - med_duration)
# Расчет медианы отклонений
med_deviation = job_stata['dev_median'].median()
# Вычисление верхней границы допустимого интервала
upper_boundary = float(med_duration + (med_deviation * outlier_mult))

Рассчитанные пороговые значения записываются в DWH для дальнейших проверок работающих сборок. Границы интервалов регулярно актуализируются в зависимости от частоты запуска. Например, для задач, которые собираются один-два раза в сутки, пересчет границ происходит каждый день, а для задач, собирающихся раз в час и чаще, пересчет выполняется каждый час.

2. Анализ запущенных задач

К уже имеющимся данным из Jenkins мы добавили определение запущенных сборок в данный момент. Каждые 5 минут при помощи методов get_running_builds и get_build_info из питоновской библиотеки python-jenkins мы получаем информацию о запущенных на данный момент сборках и их длительности, сравнивая его с допустимыми интервалами. Если текущая продолжительность сборки превышает верхнюю границу, то записываем информацию в DWH и исключаем сборку из дальнейших проверок. То есть по каждому инциденту в DWH сохраняем только одну запись, чтобы не дублировать оповещения для мониторинга.

3. Оповещение о проблемах

После выявления аномально долгих сборок необходимо оповестить команду о наличии проблемы. Следом за мониторингом запускается процесс, который проверяет наличие новых записей об аномальных сборках в DWH и отправляет в Telegram-канал уведомление с названием задачи и ссылкой на сборку в Jenkins для проверки.

Изначально мы понимали, что на первых порах нам все равно потребуется ручная проверка каждого случая оповещений. Тем не менее запуск модуля существенно ускорил реагирование на проблемы и позволил выявить несколько проблем в начальной стадии: мы смогли отловить и блокировки процессов, и зависания источников данных.

Какие возникают проблемы


На текущий момент основная проблема модуля — ложные срабатывания. В основном эта проблема актуальна для очень чувствительных задач, которые собираются, например, 0,5 минуты. Для таких задач отклонение в пару секунд будет существенно. Здесь помогает увеличение параметра outlier_mult. Но для некоторых задач мы решили ограничить применение модуля и задали им фиксированную длительность. В дальнейшем планируем дорабатывать систему для анализа таких ситуаций.

Помимо этого для некоторых задач со временем возникают особенности распределения продолжительности сборок: тенденция к росту с увеличением количества обрабатываемых данных или сезонность. Под сезонностью мы понимаем зависимость времени работы сборки, например, от дня недели, поскольку в выходные требуется обработать значительно больше записей, чем в будни. В таких ситуациях использование метода MAD будет не очень удачным и требуются дополнительные меры для корректной оценки выбросов. Для сезонных задач имеет смысл рассчитывать интервалы за соответствующие периоды (например, оценивать сборки отдельно за понедельник, вторник и т.д.). Минус такого подхода в том, что он предполагает достаточно большую историю наблюдений. Для применения MAD к задачам с тенденцией роста или уменьшения продолжительности времени сборок можно использовать один из методов сглаживания (например, скользящее среднее или метод Хольта).

Немного о наших планах


Запуск модулей помог нам ускорить получение данных в хранилище и сократить количество инцидентов с блокировками и зависшими процессами. Нам пока не удалось до конца устранить ложные оповещения об аномально долгом времени работы. В дальнейшем мы планируем развивать подход к выявлению аномалий в ситуациях сезонности или тенденции роста продолжительности работы, а также для слишком чувствительных задач. В том числе мы будем тестировать другие модели выявления аномалий в подобных задачах. Задача определения тренда также важна, так как позволит автоматически выявлять случаи, требующие оптимизации.

После усовершенствования подхода к распознаванию аномалий и устранения ложных оповещений мы планируем доработать модуль для автоматической остановки аномально долгих сборок через API Jenkins. Это поможет нам еще больше ускорить загрузку данных в хранилище.

Также мы планируем доработать мониторинг для распознавания сборок, которые завершаются раньше нижней границы допустимого интервала. Слишком низкая продолжительность может быть сигналом, что не все данные были доступны на момент запуска сборки или не отработали некоторые шаги в задаче.

В качестве идеи для последующего развития системы оповещений можно рассмотреть оповещения, связанные с BI-системой. А именно, управление лицензиями, появление новых дашбордов и источников данных.

Учитывая, что наш отдел сейчас активно переезжает на новую инфраструктуру, в основе которой будет Hadoop, Vertica и Airflow, в ней тоже потребуется прорабатывать системы мониторинга и оповещений и адаптировать имеющиеся подходы под новый стек.

Комментарии (1)


  1. shalomman
    04.09.2021 18:49

    Какая-то сумбурщина. Кто, зачем, почему?