
У меня дома стоит Bluetooth-колонка в ванной. Руки там вечно мокрые и мыльные, поэтому включить звук дождя или музыку проще по событию, например, по голосовой команде. Конечно, можно поднять Home Assistant или написать небольшой Python-скрипт, который слушал бы датчики и управлял колонкой. Но в тот момент я работал с NiFi и решил проверить, справится ли он с бытовой задачей.
Оказалось, что это не только «enterprise ETL» инструмент, а еще и гибкая платформа, которая умеет работать с MQTT-брокером. Поэтому я настроил через него простую цепочку, и колонкой начал управлять не самодельный скрипт, а NiFi.
Этот простой пример хорошо показывает идею. Если инструмент способен подружиться с бытовыми устройствами, то в промышленной архитектуре его потенциал раскрывается в полном объеме.
В Big Data подключение нового источника часто превращается в мини-проект. Требуется неделя разработки, набор уникальных скриптов, собственные форматы, исключения и обходные решения. Но когда пять источников превращаются в пятьдесят, инфраструктура начинает рассыпаться: форматы скачут, API капризничают, схемы дрейфуют, а поддержка становится бесконечным бегом с препятствиями.
Мы проходили через это несколько раз и поняли, что нам нужен фреймворк, который позволит предсказуемо, быстро и без зоопарка самописных ETL-процессов подключать новые источники.
Привет, Хабр! Я ведущий инженер-разработчик направления BigData & BI К2Тех Кирилл Гофтенюк. В этой статье расскажу, как устроен наш фреймворк на базе ADS.NiFi и Arenadata Prosperity. Покажу, как он работает, зачем нужен такой подход и что нам дал переход от хаотичных скриптов к управляемой архитектуре.
Что было
Классический ETL подход со скриптами начинает сильно буксовать, когда над потоками работает несколько команд. В проекте одного нашего заказчика всё было именно так. Когда нужно было подключить внешний источник, в задачу включались сразу несколько направлений. Одни отвечали за поток данных, другие за безопасность, а третьи за юридическое направление. При этом часть процессов жила в Airflow, а метаданные — в Postgres. Разные команды вместе делали одну и ту же интеграцию, но с разных сторон. В итоге получался лапшеобразный код, который сложно централизованно менеджерить и проверять на качество.
У источника возникало сразу несколько версий обработки. В каждой свои проверки, обходные решения и ожидания от формата данных. А стоило источнику поменять схему, как старые скрипты падали. Логирование отличалось, SLA тоже. Код лежал в разных местах, и работа пайплайна зависела от того, кто сегодня дежурит и насколько быстро он найдет нужный кусок кода. Так получался стек, который очень трудно централизованно поддерживать.
Состояние проекта на старте:
Стек Postgres+Airflow.
Разрозненность существующих интеграций.
Невозможность полноценного сбора и анализа логов.
Дублирование кода.
Отсутствие централизованного хранения информации о структуре ответов источника.
Сложности в масштабировании интеграций.
Почему начали делать свой фреймворк
На примере таких проектов мы поняли, что проще один раз сделать базу для интеграций, чем продолжать собирать обломки скриптов. Так у нас получился слой, который управляет всем потоком целиком. То, что можно переиспользовать, мы вынесли в общие модули, как и повторяющиеся куски логики. А там, где раньше приходилось писать код, оставили только конфигурацию. Решили, что новая интеграция должна запускаться не через разработку, а через настройку, но писать для этого ещё один самописный фреймворк на Python не стали. Иначе бы появилась новая зона ручной поддержки, от которой хотелось уйти. Мы начали искать инструмент для сборки пайплайна и остановились на Low-code платформе NiFi.

Почему NiFi ложится на такую задачу
Даже пользователи без глубокого технического погружения могут собрать на нём свои пайплайны. Это похоже на работу с конструктором Lego. Есть много заранее подготовленных элементов, которые можно соединять между собой. Но в итоге получится не «Тысячелетний сокол» из вселенной «Звёздных войн», а рабочий процесс. Он переносит данные из одной системы в другую и преобразует их по пути. Точно как в ETL: extract, transform, load. Это действительно напоминает конструктор: поставил нужные процессоры, связал их и получил рабочий процесс. Такой подход снижает зависимость от грейда разработчиков и количества специалистов, потому что большая часть логики уже есть в коробке.
Но всё работает до тех пор, пока пайплайнов мало. Когда интеграций становится много, всё начинает расползаться. Например, когда у нашего сервиса скопилось 500 гигабайт данных, API больше не мог их «протаскивать». Запрос не проходил из-за слишком большого объема. Поэтому между сервисом и базой мы поставили прослойку, которая собирала данные у себя, забирала только дельту, и отправляла её одним пакетом. Тогда мы и увидели, что в крупных интеграциях без продуманной архитектуры быстро наступает хаос, и нужны правила.
Почему мы добавили Prosperity
Была нужна база для хранения схемы источников, расписания, типов данных и параметров. В качестве платформы разработки программных приложений мы выбрали Arenadata Prosperity, чтобы в одном месте собирать всю метаинформацию нашего NiFi. Так пользователи могли быстро посмотреть логи и добавить интеграцию при помощи одной кнопки. В итоге у нас получилась следующая архитектура:

Мы реализовали фреймворк на Arenadata Prosperity (хранение метаданных, функций) и ADS.NiFi (оркестрация, сбор логов, выполнение интеграций). Весь процесс от получения информации у внешних источников до отправки пользователю менеджерится сверху при помощи дополнительных кусочков NiFi (ProcessGroup), которые мы написали.
Это логические контейнеры для процессоров, портов и служебных компонентов, которые позволяют собирать поток в иерархию с любой глубиной вложений. Мы используем их как Lego-блоки: внутри фиксируем общую логику (маршрутизацию, валидацию, retry, логи), а переменные (URL, креды, форматы) выносим в ParameterContext для переиспользования. Для каждой новой интеграции меняем только конфиг, например, имя таблицы и путь к сетевому диску.
Что получилось
Когда основная механика заработала, стало окончательно ясно, что это уже не набор шаблонов, а полноценный фреймворк, который закрывает весь процесс интеграции от источника до STG.
Функции и особенности фреймворка
Фреймворк решает несколько практических задач.
Загрузка данных из API в слой STG. Данные тянутся из источников с минимальной обработкой. Фреймворк не пытается сделать за инженера всю трансформацию. Он просто гарантирует, что сырые данные окажутся в STG в предсказуемом виде.
Единая точка управления расписанием. Даже если интеграций десятки, всё работает через одну точку планирования. Никаких отдельных Cron, Airflow и сервисов на стороне команд.
Логирование и базовый мониторинг. Все пайплайны пишут логи в одно место. Время загрузки, объем, ошибки, поведение API. В результате инженеру не нужно искать следы в нескольких системах.
Единое хранилище метаданных. Методы, объекты, расписания, схемы и параметры лежат в Prosperity. NiFi берет оттуда всё, что нужно, поэтому подключение новой интеграции сводится к конфигурации.
Единый способ загрузки файлов. Файлы JSON и XML обрабатываются одинаково на уровне фреймворка. Функции SCD1 и SCD2 уже встроены. Инженеру не нужно писать обработку изменений вручную.
Функции фреймворка закрывают не только крупные проблемы интеграций, но и мелкие случаи, которые вылезают в самый неподходящий момент. Например, источник внезапно поменял поле inn на tax_id, XML приехал с лишними пробелами, дата пришла строкой с лишними нулями. В самописных пайплайнах все это ведёт к ручной правке кода, а во фреймворке обрабатывается одинаково. А чтобы он единообразно работал для любых источников, мы не только собрали общие функции, но и привели к понятной структуре сами интеграции.
Иерархия объектов
Мы разложили объекты по уровням.
Система-источник.
Наверху находится внешняя система — источник, откуда приходят данные. Это может быть ГАР (ФИАС), СПАРК Интерфакс или Банк России, — любой поставщик, который нужен в вашем проекте.
Интеграция.
Ниже идет интеграция. Это уже внутренняя область компании, которая работает с этим источником. Например, бухгалтерия, HR или любой другой департамент.
Метод.
Это способ, которым мы вытаскиваем данные из нашей системы-источника. Где-то это API, где-то файловая выгрузка, которая появляется в сетевой папке.
Объект.
И самый нижний уровень — конкретный Excel-файл или таблица, которую мы обрабатываем через выбранный метод для нужной интеграции и системы источников.

Мы описали объекты, с которыми работает фреймворк, но на схеме есть еще ордера. Это задания на загрузку данных. Они фиксируют, что именно нужно выполнить, и могут создаваться на любом уровне иерархии. По сути это формализованный запрос на конкретный ETL шаг. Например, загрузить из Kafka объект журнала безопасности предприятия за дату 2025.11.18 для объекта №59486 компании «Н». Такой ордер поднимает весь нужный контур и не требует от инженера вручную разбираться в пайплайне.
Эта схема помогает держать интеграции под контролем и делает их предсказуемыми.
Пример структуры пайплайна NiFi
Чтобы было понятно, как выглядит обычная интеграция, опишу её текстом. Процессоры могут отличаться, но структура почти всегда одинаковая.
Типовой пайплайн:
GenerateFlowFile. Периодически создаёт пустой FlowFile по расписанию и запускает цепочку обработки ордеров.
ExecuteSQL. Выполняет SQL‑запрос, который формирует/готовит список ордеров в базе и записывает результат в FlowFile.
ExecuteSQLRecord. Достаёт следующую порцию ордеров (record‑ориентированный запрос), превращая результат в набор записей для дальнейшей маршрутизации.
RouteOnAttribute. По атрибутам FlowFile (например, количеству записей) решает, есть ли ещё ордера для обработки, и отправляет их по отношению not_empty дальше на интеграцию.
ExecuteScript 1.26.0. Скриптовая логика интеграции: преобразование, вызов внешнего API или запись в целевую систему, уже для тех ордеров, которые прошли форматно-логический контроль.
Процесс можно расширить, но основа всегда остается такой. Это и есть причина, почему фреймворк работает стабильно даже с большим количеством интеграций.
Как это выглядит для инженера
При этом для инженеров, которые поддерживают нашу систему, все интеграции выглядят не как набор отдельных пайплайнов, а как пачка конфигурационного кода. Это обычные записи в таблицах базы данных. Если нужна новая интеграция, которая похожа на существующую, достаточно добавить новую строку в конфигурационную таблицу. Ни переписывать пайплайн, ни влезать в код не нужно: NiFi подтягивает параметры из Prosperity и собирает нужный процесс автоматически.

Пример конфигурации. Это не реальный формат, а псевдокод для понимания, но его достаточно, чтобы NiFi собрал процесс:
method_id: 12
integration_id: 34
method_name: GetCRMEvents
object: unzip CRM_EVENTS_2
is_active: true
avro_schema:{
"name": "nifiRecord",
"type": "record",
"namespace": "org.apache.nifi",
"fields": [
{"name": "ID", "type": ["int", "null"]},
{"name": "ISACTIVE", "type": ["boolean", "null"]},
{"name": "DESC", "type": ["string", "null"]},
{"name": "NAME", "type": ["string", "null"]}
]
}
schedule: 5 4 * * sun
Если интеграцию нужно собрать с нуля, процесс тоже простой. На канвасе NiFi есть папка с уже реализованными интеграциями. Внутри лежат готовые блоки, которые используются как шаблоны. На их основе можно добавить свой блок, соединить его с остальными и получить рабочий процесс. Дальше пайплайн автоматически включается в общий контур. Он работает с нашим планировщиком, пишет логи в общий сборочный контур и проходит через стандартные обработчики. Там же происходят приведение типов, работа с текстовыми полями, проверка структуры файла и базовая валидация.

Преимущества фреймворка
Фреймворк дает несколько практичных вещей. Во-первых, он собран из универсальных блоков. Это избавляет от дублирования и позволяет собирать новые интеграции из тех же элементов, которые уже проверены в других местах. Во-вторых, интеграции проще масштабировать. Структура ответа источника может меняться, но пайплайны не нужно переписывать. Достаточно обновить конфигурацию. Метаданные лежат в одном месте. Там же хранится расписание, описание структуры данных и правила преобразований. ADS.NiFi используется как инструмент потоковой обработки, который берет эти параметры из Prosperity и выполняет интеграцию в нужном режиме.
У нас есть несколько типовых случаев, когда фреймворк значительно сократил время и усилия. За девять месяцев мы адаптировали около двухсот существующих интеграций командой из трёх разработчиков. Ориентировочный объем загрузки по всем источникам держится на уровне примерно 100 гигабайт в сутки, и это проходит через единый контур без конфликтов.
Эти примеры показывают, что фреймворк уже закрывает большую часть рутины и позволяет не зависеть от того, кто и как писал пайплайн. Но на этом развитие не заканчивается.
Что ещё будет
Есть несколько направлений, которые мы хотим усилить. Добавить возможность легкого подключения пользовательских процессоров и кастомных скриптов (Python, Groovy). Это позволит обрабатывать нестандартные источники и форматы без переписывания всего пайплайна и появления новых скриптов.
Также планируем внедрять CI/CD пайплайнов для деплоя Process Groups из NiFi Registry. Это упростит доставку изменений. Команды смогут безопасно выкатывать новые версии через обычный релизный процесс без ручного копирования и импорта групп.
Эти улучшения сделают фреймворк более гибким и удобным для поддержки и усилят базу, на которой он уже работает.
Заключение
У нас получился фреймворк, который снимает большую часть проблем при интеграции разнородных источников. Пайплайны собираются по одному шаблону, метаданные лежат в одном месте, а логика не размазывается по инфраструктуре. Инженер получает предсказуемый процесс, который можно быстро понять и так же быстро поправить.
NiFi отвечает за исполнение, Prosperity — за конфигурации, а общая архитектура — за то, чтобы все это работало как единая система. Мы больше не держим интеграции на скриптах и не тратим время на поиск нужного куска кода. Это экономит время, снижает нагрузку на поддержку и делает систему устойчивой к изменениям, которых в больших данных всегда больше, чем хочется.
mosinnik
Где на этот фреймворк посмотреть? Где глянуть как заинтегрированы nifi и prosperity и текстовые примеры пайплайнов, которые такую интеграцию используют?
KGoftenyuk Автор
Фреймворк из статьи - закрытый, поэтому «посмотреть» его как готовый продукт или исходники не получится. Максимум, что доступно снаружи - концепция, архитектура и примеры типовых пайплайнов, описанные в самой статье
Типовой способ связать NiFi и любую PostgreSQL‑совместимую СУБД (а Prosperity именно такая) - JDBC, именно его и используем.