Добрый день. Меня зовут Иван, я дата‑инженер, основной профиль — доставка данных от источников до целевых систем DWH. Чаще всего работаю с Apache NIFI.
В сообществе NIFI в Telegram часто возникает вопрос — как мне запустить мой поток тогда, когда мне надо. Я решил систематизировать свой опыт, информацию от коллег и участников нашего крутейшего сообщества в отношении старта потоков обработки. Данная статья будет полезна в первую очередь новичкам.
В Apache NiFi поток обработки подразумевает формирование FlowFile и прохождение его по всему тракту (PipeLine) обработки. Таким образом, задача старта потока обработки сводится, в основном, к генерации начального FlowFile в необходимый момент.
В Apache NiFi есть генерирующие процессоры и обрабатывающие. Под генерирующими я понимаю процессоры, после работы которых формируется новый FlowFile. Примерами этих процессоров могут быть GenerateFlowFile, ConsumeKafkaRecord, ListFile. Обрабатывающие процессоры требуют наличие входящей очереди и выполняет свою функцию на основании атрибутов и контента поступившего FlowFile.
Это разделение условное, так как существуют процессоры, относящиеся к двум группам, например ExecuteSql. т. е. такие процессоры могут самостоятельно генерировать данные в случае отсутствия входящего соединение, так и обрабатывать входную очередь.
Опишу идеальную задачу: у вас есть поток обработки данных, есть входной порт, поступление FlowFile на который инициализирует некий процесс ETL. Расположим рядом абстрактную группу, обозначим ее «Schedulers», определим выходной порт, соединим его с потоком обработки. Итак, мы имеем отдельно сам поток, и отдельно систему его инициализации.
Рассмотрим варианты, как можно запустить поток.
Интервал
В каждом процессоре есть возможность запускать его по интервалу. Это значит, что с момента последнего запуска процессора повторяй запуск выполнится через указанное время. В интервале можно задать секунды, минуты, дни и даже годы (если у вас ну очень медленный источник данных).
Достоинства этого метода очевидны — пайплайн будет работать через заданный интервал времени.
Недостатки тоже ясны и понятны:
Если выбрать небольшой интервал, есть вероятность, что в вашем потоке будут копится триггеры вызовов. Особенно это актуально, если есть какой‑то тяжелый запрос к источнику. Он может и не вернуть данные, но будет выполняться долго. В результате будем иметь накопление очереди, постоянно работающий пайплайн.
Если выбрать большой интервал, есть вероятность, что на источнике произойдет много изменений, и ваш пайплайн потратит много ресурсов на их обработку. Т.о. в возможна периодическая пиковая нагрузка на сервер.
К вам пришли заказчики и говоря — мы хотим видеть данные раз в три часа, по субботам не надо, ночью тоже не надо, а вот 1 числа каждого месяца надо в 8 утра начинать. И первое, к чему мы приходим — добавим триггер с расписаниям по крону.
Cron
Каждый процессор может быть запущен по расписанию Cron. То есть, в указанное время будет сгенерирован управляющий триггер.
Достоинства:
в указанное время будет сгенерирован управляющий триггер
Недостатки:
каким бы сложным не было ваше выражение cron, есть вероятность, что надо его дополнить.
Казалось бы, применяя вместе интервал и cron можно закрыть все потребности. Однако ваш инициализатор со временем начнет походить примерно на такое:
В принципе, иных вариантов генерации FlowFile нет. И из этого надо сделать удобную систему инициализации. Далее речь пойдет об организации системы запуска и инициализации потока.
Сочетание Cron и интервалов
Для этого кейса удобно объединить генераторы в группы, и определить атрибут, который будет отражать признак расписания. Например schedule. Каждый генератор пишет свое значение в атрибут:
Конфигурации потоков удобно сохранить в БД, или в ином внешнем сервисе. Например, так выглядит список расписаний в базе. Отдельно у меня работает GenerateFlowFile с интервалом в 1 минуту, где в атрибут пишется время в формате HH:mm. Это позволяет запускать потоки в любое удобное время, причем в разное, не ограничиваясь одним.
Из базы потоки выбираются запросом:
SELECT * from ${paramtable}
WHERE
active = 1 AND
'${schedule}' in (
SELECT value
FROM string_split(schedule, ';')
)
ORDER BY priority
Запрос выберет все активные конфигурации, для которых совпадет пришедшее расписание. Далее потребуется перевести конфигурацию в атрибуты, и применить их в потоке обработки.
Внешнее управляющее событие
Под этим термином я понимаю формирование управляющего триггера извне, а внутри Nifi потребуется создать механизм получения этого события. Рассмотрим самые простые примеры.
Управление по HTTP/HTTPS
Слушаем HHTP/HTTPS на определённом порту. Разбираем контент, передаем в PipeLine.
При получении разбираем два варианта - либо пришло расписание, либо готовая конфигурация.
Такой кейс я применяю для связки Airflow + Nifi. Есть источник, для которого инкремент не определяется простым условием, требуется сложный запрос для вычисления новой порции данных. Разработан DAG, в нем определяется необходимость получения новых данных, вычисляются граничные значения из ранее полученных данных, формируется специфичный запрос и готовая конфигурация для потока загрузки. Конфигурация в виде JSON передается на Nifi.
Консьюминг данных
Получение данных от Kafka
Слушаем топики Kafka. На основании топика, и также данных, которые пришли, определяется что делать с сообщением. Если это управление (содержит или расписание, или готовую конфигурацию) - перевод в группу инициализации. Если это готовое сообщение - то перевод в PipiLine обработки.
Получать примеры можно от разных систем — управляющий Email, событие в RabbitMq... В Nifi достаточно большой список систем, откуда можно получить данные.
Пример консьюмеров в NIFI
Включение процессора через API
Apache NiFi имеет API. Само описание доступно в документации. Существует отличная реализация API на Python — NiPyApi. Подробности в документации. Вам понадобится идентификатор процессора, и метод Shedule_Processor.
Заключение
Конечно, это не все возможные варианты инициализации PipeLine. Можно применять создание файла в файловой системе, наличие записи в таблице базы данных, или записи к кеше. Не рассмотрел я и Wait/Notify процессоры, которые позволяют ждать наступление события внутри самого Nifi. Не рассмотрел прием данных от другого Nifi/MiNifi. Все зависит от вашего окружения и ваших потребностей.
Apache NiFi является самодостаточным инструментом, имеющим собственные средства оркестрации. При этом он легко интегрируется с другими системами, откуда можно получать управляющие события, и нужно рассматривать как часть совокупности сервисов интеграции данных.
Полезные ссылки:
velipre_xella
Лайк поставить не могу, просто спасибо. Зарядил вчера запуск хранимки постгреса по крону из NiFi, всё ок отработало.