Разработчики потоков обработки данных на сервисе Apache NiFi знают, что все интересующие события процессоров можно вывести в интерфейс в раздел "Bulletin Board". Однако, если вы в текущий момент не следите за интерфейсом, увидеть сообщение становится невозможным.

Apache NiFi хранит логи (по умолчанию) в папке "./logs". Процессоры пишут свои события в файл "nifi-app.log". И если у вас есть доступ к каталогу с логами, то никаких трудностей нет - открываем файл, читаем, фиксим поток. А если у вас нет доступа? Или есть желание оперативно получить сведения о событии, чтобы понять контекст его возникновения? Ситуации бывают разные. Рассмотрим простой способ получить информацию из лога.

Иные варианты получения данных из логов
  1. Применяем filebeat - ставим, настраиваем на папку, шлем логи в ELK.

  2. Настраиваем logback.xml - применяем логгер, отправляющий данные по сети на LogStash, например стандартный или кастомный.

Для чтения файла лога воспользуемся процессором TailFile, далее конвертируем полученные данные, разобьём на отдельные записи, извлечем данные в атрибуты и отправим себе в канал Slack:

Общий вид потока, читающего лог
Общий вид потока, читающего лог

Настройки TailFile - читаем один файл, указываем его расположение, начальную позицию и периодичность:

Параметры процессора TailFile
Параметры процессора TailFile

Далее преобразуем полученные данные с помощью ConvertRecord:

Параметры ConvertRecord
Параметры ConvertRecord

Чтение данных выполняется с помощью GrokReader, позволяющего структурировать текстовые данные. О Grok есть много публикаций, например тут от @chemtech.

Параметры GrokReader
Параметры GrokReader

В настройках указываем шаблон, и выставляем параметр, добавляющий данные к предыдущему сообщению, если они не подходят под шаблон.

Структуре лога соответствует выражение:

%{TIMESTAMP_ISO8601:date} %{LOGLEVEL:Level} \[%{DATA:thread}] %{DATA:logger} %{GREEDYDATA:message}

В итоге имеем Json вида:

{ 
  "date" : "2022-06-01 23:34:02,905",
  "Level" : "INFO",
  "thread" : "pool-10-thread-1",
  "logger" : "o.a.n.c.r.WriteAheadFlowFileRepository",
  "message" : "Initiating checkpoint of FlowFile Repository",
  "stackTrace" : null,
  "_raw" : "ds (Stop-the-world time = 2 milliseconds, ..."
}

Извлекаем данные из контента. в атрибуты с помощью EvaluateJsonPath:

Параметры EvaluateJsonPath
Параметры EvaluateJsonPath

Далее шлем сообщения в канал Slack:

Разбиваем сообщения по уровню, и отправляем в Slack
Разбиваем сообщения по уровню, и отправляем в Slack

Пример отправки в Slack:

Добавляю имя хоста, чтобы понимать, с какого инстанса идет сообщение
Добавляю имя хоста, чтобы понимать, с какого инстанса идет сообщение

В заключение скажу, что этот подход не является моей личной разработкой, известен давно, и можно найти различные примеры потоков Nifi, читающих логи. Как вариант, можно слать логи не в Slack, а в телеграмм или Elastic.

Такая структура не является панацеей, и не претендует на уровень production. По хорошему, все логи надо обрабатывать через инфраструктуру, для этого предназначенную - ELK, Zabbix и т.д.

Каким образом я использую такой вариант:

  1. При разработке потока создаю свой уровень логирования, и отлавливаю определенные события через LogEvent / LogAttribute.

  2. При долговременом тестировании потоков (test / stage) отлавливаю ошибки.

  3. На проде ловлю ошибки загрузки, чтобы успеть быстро загрузить вручную данные и успеть до старта загрузки DWH.

Полезные ссылки:

  1. Документация Apache Nifi.

  2. Полезный YouTube канал, где я и увидел принцип построения такого потока.

  3. Сообщество в телеграмм

Всем добра.

Комментарии (0)