Мониторинг
Продолжаем разговор о том, что в NiFi делать можно и нужно, а что можно, но лучше не стоит. Если вы пропустили первую часть разговора, то вам сюда. Там про улучшение читаемости схем и повышение производительности (ну почти). Здесь же пойдет речь о том, как проводить мониторинг бизнес-части схемы, чтобы всем было хорошо (ну или чтобы не было плохо), ну и немного о переносимости процессоров.
Есть мнение, что хуже всего — не вести мониторинг бизнес-части схемы совсем, используя популярный подход «и так сойдет!». Но если подумать, есть одна вещь хуже отсутствия мониторинга — неправильный мониторинг. Типовой «временный костыль»: добавляем отключенную\остановленную группу процессоров, переносим туда поток ошибок, дебажим…. и забываем. Что случается потом? Да-да, очередь переполнилась, backpressure остановил процессор и «Все пропало, шеф!!!». Не надо так. Если уж делаем, то делаем правильно. А как это — правильно? Не претендуя на премию Нобеля в сфере мониторинга, приведу с десяток непрошенных советов.
Выставляем FlowFile Expiration на «разумно достаточное» время и получаем простой и удобный механизм оперативной отладки, не сильно рискуя остановить все и насовсем. См. Рис. 1.
Коли речь зашла об очередях, попробуем быстро и просто наладить мониторинг очередей самым простым способом. Выгружаем состояние всех очередей, в случае возникновения backpressure в любой из них – бежим разбираться, что именно сломалось. Для передачи данных воспользуемся встроенным в NiFi Site-to-Site Reporting Task, результат работы которого передадим самим себе через Remote Process Group и уже там обработаем – см. Рис. 2.
Обратите внимание, что хотя в настройках destinationUrl и указан :8080/nifi (Интерфейс), он используется только для согласования соединения, сама передача идет по порту 10000.
Для парсинга используем процессор QueryRecord (никакого split-evaluate-route!). А с результатом в виде [{"sourceId":"cbffbfdc-017d-1000-9942-e797792a97a9","sourceName":"GenerateFlowFile"}], указывающем, что-где сломалось, уже можно работать.
3. Тем же образом можно настроить SiteToSiteBulletinReportTask – он будет направлять нам КАЖДОЕ сообщение об ошибках обработки (помните эти красные квадратики в углу, которые вечно не успеваешь заскриншотить?) с атрибутами flow-файла. Это на нагруженных (кривых) инстансах генерирует изрядно трафика, но тем больше у авторов flow поводов все быстрее починить.
Но все это, разумеется, «мониторинг с высоты птичьего полета», т. е. кластера целиком. А хотелось бы обработки ошибок на уровне самой группы процессоров. Ведь прежде чем сообщать об ошибке, необходимо попробовать её обработать на месте. Т. е. тихо дропнуть failure-ветку — таки нехорошо, да и сразу бить тревогу – не совсем правильно. А как правильно? Смотрим пункт 4.
4. Тот же InvokeHTTP помимо Failure предоставляет отдельный результат Retry. Это так называемый «восстановимый» класс ошибок. Конечно, если сервер вернул тебе 400 – «кривой запрос», то сам по себе он «прямым» не станет, а в случае ответа «50(0|2|3)» через N-секунд (перезапуск сервиса, запрос другому инстансу и пр.) результат может быть уже совсем другим. Проще (но не правильней) завернуть очередь Retry на сам процессор, но в случае со «сбоем по нагрузке» результат бесконечного числа обращений к сервису может оказаться далеким от желаемого. Тут на помощь приходит процессор ControlRate – смотрим на Рис. 4.
Ждем N-секунд и повторяем отправку N flow-файлов. Для того, чтобы избежать бесконечного зацикливания кривых запросов, задаем параметр “FlowFile Expiration” для очереди. Минимум накладных расходов + условно приличный результат. Почему «условно»? Потому что если упало 100500 кривых запросов, возможен вариант, при котором вся очередь ошибок не успевает обработаться с учетом времени жизни flow-файлов и задержки. Т.е. при MaximumRate=10, Schedule=10 sec и FlowFile Expiration=60 sec мы обработаем 60 ошибок, все остальное тихо умрет. Разумеется, эти параметры можно и нужно двигать, но при данном подходе указанный риск будет всегда. Плюс в этой схеме мы никак не узнаем о том, что выполнение действия закончилось ошибкой. Если вам нужна более строгая обработка, то смотрим пункт 5.
5. В UpdateAttribute ставим счетчик вида ${counter:replaceNull(0):plus(1)}, в RouteOnAttribute условие ${counter:ge(5)}. На выходе получаем фиксированное количество повторов для каждого flow-файла с генерацией аларма по неуспешности. Усложнение достаточно значительное (меньше – лучше, да?) но с учетом (в норме!) небольшого количества ошибок, оправданное. В случае большого потока, чтобы избежать backpressure со стороны потока обработки ошибок, возможно, имеет смысл увеличить размер соответствующих очередей.
А, да. Важно: 100% покрытие обработкой ошибок, как правило, не требуется примерно нигде. Делать 10 повторов через 10 секунд в случае, если пользователь ждет ответа на свой запрос в течение дефолтного таймаута в 60 секунд, не надо ????.
6. Еще один важный механизм – это контроль работы при условии явного «отсутствия ошибок». Ситуации вида «проблем нет, но данных тоже нет» бывают не сказать чтобы редко. Иногда косвенным признаком зависания процесса является рост очереди, но если, например, завис ConsumeKafka на входе группы – получается совсем нехорошо. Для решения этой проблемы на помощь приходит процессор MonitorActivity. Он умеет генерировать сообщение в том случае, если данные не приходят дольше, чем задано в Threshold Duration. Вешаем на выход ConsumeKafka параллельно основной ветке MonitorActivity с параметром минут в 15, на выход делаем logmessage|putemail примерно таким образом, как на Рис. 6, … и наслаждаемся результатом.
Понятно, что актуальные пороги задаются вашим потоком данных, и ложноположительные срабатывания в этом случае неизбежны. Но это определенно лучше, чем ничего. В качестве некоторой альтернативы возможна реализация с heartbeat-пробингом интересующего сервиса. При этом надо понимать, что то, что сервис работоспособен, и то, что у вас идут данные, – не одно и то же. Хорошо бы подходы совмещать и для ключевых схем настраивать и то и другое примерно таким образом, как на Рис. 7.
По расписанию отправляем тестовые запросы. В случае возникновения единичных проблем — пишем сообщения в лог, если проблема «сама не решилась» — с помощью того же MonitorActivity отправляем сообщение администратору. Вообще, если говорить о ZIIoT-платформе, то этим должен заниматься не NiFi, а «Мониторинг платформы». Но, например, в применении к системам заказчика и\или при необходимости мониторинга на более высоком уровне (делаем актуальный запрос и проверяем не наличие, а корректность ответа) – это вполне себе вариант. При реализации имеет смысл помнить, что данный метод несколько «инвазивен», и кривыми руками (например, не задав Scheduling для GenerateFlowFile) можно сломать все, что угодно.
7. Итак, на текущий момент мы мониторим состояние инстанса целиком и обрабатываем те ошибки, которые имеет смысл обрабатывать. Осталось отсигнализировать о том, что обработать не получилось. Тут у нас есть два штатных инструмента – logmessage и logattribute. Первый логичным образом позволяет записать сообщение в лог типа «Я упал!» с указанием loglevel и logprefix, по которому очень удобно grep’ать логи. Второй помимо этого позволяет отправить в лог содержимое flow-файла и заданный регуляркой набор атрибутов. Его стоит использовать с осторожностью – есть риск заспамить лог вдребезги.
Помимо «встроенных» средств записи в лог для последующего анализа в ключевых местах имеет смысл явным образом сигнализировать об ошибках администратору flow, например, с помощью процессора PutEmail. По опыту, 100500 писем о 100500 одинаковых ошибок никому не нужны, так что ControlRate + FlowFile Expiration в помощь.
Кстати в ZIIoT-платформе помимо инструментов NiFi есть еще один канал информирования – встроенный механизм событий zif-events. Если вдруг кому будет интересно, расскажем о нем подробнее. Я же пока пошел писать третью часть саги о NiFi, в которой изложу свои мысли о том, как обеспечить переносимость процессоров, а также разберу несколько шаблонных решений популярных на платформе ZIIoT задач.