Добрый день. Меня зовут Иван, я дата‑инженер, основной профиль — доставка данных от источников до целевых систем DWH. Чаще всего работаю с Apache NIFI.

В сообществе NIFI в Telegram часто возникает вопрос — как мне запустить мой поток тогда, когда мне надо. Я решил систематизировать свой опыт, информацию от коллег и участников нашего крутейшего сообщества в отношении старта потоков обработки. Данная статья будет полезна в первую очередь новичкам.

В Apache NiFi поток обработки подразумевает формирование FlowFile и прохождение его по всему тракту (PipeLine) обработки. Таким образом, задача старта потока обработки сводится, в основном, к генерации начального FlowFile в необходимый момент.

В Apache NiFi есть генерирующие процессоры и обрабатывающие. Под генерирующими я понимаю процессоры, после работы которых формируется новый FlowFile. Примерами этих процессоров могут быть GenerateFlowFile, ConsumeKafkaRecord, ListFile. Обрабатывающие процессоры требуют наличие входящей очереди и выполняет свою функцию на основании атрибутов и контента поступившего FlowFile.

Это разделение условное, так как существуют процессоры, относящиеся к двум группам, например ExecuteSql. т. е. такие процессоры могут самостоятельно генерировать данные в случае отсутствия входящего соединение, так и обрабатывать входную очередь.

Опишу идеальную задачу: у вас есть поток обработки данных, есть входной порт, поступление FlowFile на который инициализирует некий процесс ETL. Расположим рядом абстрактную группу, обозначим ее «Schedulers», определим выходной порт, соединим его с потоком обработки. Итак, мы имеем отдельно сам поток, и отдельно систему его инициализации.

Пример выноса группы запуска отдельно от основного потока
Пример выноса группы запуска отдельно от основного потока

Рассмотрим варианты, как можно запустить поток.

Интервал

В каждом процессоре есть возможность запускать его по интервалу. Это значит, что с момента последнего запуска процессора повторяй запуск выполнится через указанное время. В интервале можно задать секунды, минуты, дни и даже годы (если у вас ну очень медленный источник данных).

Интервал запуска - раз в пять минут
Интервал запуска - раз в пять минут

Достоинства этого метода очевидны — пайплайн будет работать через заданный интервал времени.

Недостатки тоже ясны и понятны:

  • Если выбрать небольшой интервал, есть вероятность, что в вашем потоке будут копится триггеры вызовов. Особенно это актуально, если есть какой‑то тяжелый запрос к источнику. Он может и не вернуть данные, но будет выполняться долго. В результате будем иметь накопление очереди, постоянно работающий пайплайн.

  • Если выбрать большой интервал, есть вероятность, что на источнике произойдет много изменений, и ваш пайплайн потратит много ресурсов на их обработку. Т.о. в возможна периодическая пиковая нагрузка на сервер.

  • К вам пришли заказчики и говоря — мы хотим видеть данные раз в три часа, по субботам не надо, ночью тоже не надо, а вот 1 числа каждого месяца надо в 8 утра начинать. И первое, к чему мы приходим — добавим триггер с расписаниям по крону.

Cron

Каждый процессор может быть запущен по расписанию Cron. То есть, в указанное время будет сгенерирован управляющий триггер.

Запуск по субботам, 8:00
Запуск по субботам, 8:00

Достоинства:

  • в указанное время будет сгенерирован управляющий триггер

Недостатки:

  • каким бы сложным не было ваше выражение cron, есть вероятность, что надо его дополнить.

Казалось бы, применяя вместе интервал и cron можно закрыть все потребности. Однако ваш инициализатор со временем начнет походить примерно на такое:

Генераторы для одной задачи. Только интервальные)
Генераторы для одной задачи. Только интервальные)

В принципе, иных вариантов генерации FlowFile нет. И из этого надо сделать удобную систему инициализации. Далее речь пойдет об организации системы запуска и инициализации потока.

Сочетание Cron и интервалов

Для этого кейса удобно объединить генераторы в группы, и определить атрибут, который будет отражать признак расписания. Например schedule. Каждый генератор пишет свое значение в атрибут:

Пример расписаний в БД конфигурации
Пример расписаний в БД конфигурации

Конфигурации потоков удобно сохранить в БД, или в ином внешнем сервисе. Например, так выглядит список расписаний в базе. Отдельно у меня работает GenerateFlowFile с интервалом в 1 минуту, где в атрибут пишется время в формате HH:mm. Это позволяет запускать потоки в любое удобное время, причем в разное, не ограничиваясь одним.

Из базы потоки выбираются запросом:

SELECT * from ${paramtable} 
WHERE
active = 1 AND 
'${schedule}' in (
    SELECT value 
    FROM string_split(schedule, ';')
)
ORDER BY priority

Запрос выберет все активные конфигурации, для которых совпадет пришедшее расписание. Далее потребуется перевести конфигурацию в атрибуты, и применить их в потоке обработки.

Внешнее управляющее событие

Под этим термином я понимаю формирование управляющего триггера извне, а внутри Nifi потребуется создать механизм получения этого события. Рассмотрим самые простые примеры.

Управление по HTTP/HTTPS

Слушаем HHTP/HTTPS на определённом порту. Разбираем контент, передаем в PipeLine.
HTTPS слушает два пути - config и schedule
HTTPS слушает два пути - config и schedule

При получении разбираем два варианта - либо пришло расписание, либо готовая конфигурация.

Перевод контента в атрибуты.
Перевод контента в атрибуты.

Такой кейс я применяю для связки Airflow + Nifi. Есть источник, для которого инкремент не определяется простым условием, требуется сложный запрос для вычисления новой порции данных. Разработан DAG, в нем определяется необходимость получения новых данных, вычисляются граничные значения из ранее полученных данных, формируется специфичный запрос и готовая конфигурация для потока загрузки. Конфигурация в виде JSON передается на Nifi.

Консьюминг данных

Получение данных от Kafka
Получение данных от Kafka
Получение данных от Kafka

Слушаем топики Kafka. На основании топика, и также данных, которые пришли, определяется что делать с сообщением. Если это управление (содержит или расписание, или готовую конфигурацию) - перевод в группу инициализации. Если это готовое сообщение - то перевод в PipiLine обработки.

Получать примеры можно от разных систем — управляющий Email, событие в RabbitMq... В Nifi достаточно большой список систем, откуда можно получить данные.

Пример консьюмеров в NIFI

Включение процессора через API

Apache NiFi имеет API. Само описание доступно в документации. Существует отличная реализация API на Python — NiPyApi. Подробности в документации. Вам понадобится идентификатор процессора, и метод Shedule_Processor.

Заключение

Конечно, это не все возможные варианты инициализации PipeLine. Можно применять создание файла в файловой системе, наличие записи в таблице базы данных, или записи к кеше. Не рассмотрел я и Wait/Notify процессоры, которые позволяют ждать наступление события внутри самого Nifi. Не рассмотрел прием данных от другого Nifi/MiNifi. Все зависит от вашего окружения и ваших потребностей.

Apache NiFi является самодостаточным инструментом, имеющим собственные средства оркестрации. При этом он легко интегрируется с другими системами, откуда можно получать управляющие события, и нужно рассматривать как часть совокупности сервисов интеграции данных.

Полезные ссылки:

Комментарии (1)


  1. velipre_xella
    00.00.0000 00:00
    +1

    Лайк поставить не могу, просто спасибо. Зарядил вчера запуск хранимки постгреса по крону из NiFi, всё ок отработало.