Эта статья для тех, кто только открывает для себя мир NiFi или планирует применять этот чудесный инструмент. Инструмент необычный, и, чтобы его эффективно использовать, важно понимать, как NiFi работает, какие у него сильные и слабые стороны.

Определение

NiFi — ETL-инструмент для интеграции данных из различных источников: базы данных, API, файлы и многие другие, а также для доставки данных в различные системы-получатели.

В NiFi выделенные ресурсы будут влиять на то, сколько данных можно будет обработать. NIFI не приходится рассчитывать на ресурсы СУБД или других вычислительных систем. Поэтому, прежде чем работать с этим инструментом, подумайте, какие данные будут с помощью него загружаться, а точнее какой объем этих данных.

NiFi — потоковый инструмент, и хороший вариант его использования — загружать небольшие порции данных через небольшие промежутки времени. Желательно избегать загрузки всей информации за длительный период, если вам все-таки нужно реализовать подобный сценарий, читайте об этом в прошлой статье:

Предлагаю аналогию, которая поможет объяснить суть потокового инструмента. Представьте большой завод со множеством конвейерных лент. По лентам перемещаются ящики из одного цеха в другой и проходят через какие-то аппараты. С этим образом будем работать дальше.

FlowFile

FlowFile — объект обработки в NiFi, наименьшая единица информации, с которой можно работать, создавая новые потоки данных.

В нашей аналогии ящики — это и есть FlowFile. Содержимое ящиков будем считать данными, или — в терминах NiFi — content. Кроме содержимого на ящиках есть бирки со множеством строк — дополнительная информация об обработке FlowFile, или атрибуты. 

Processors, Relations, Upstream

Для обработки FlowFile нужен механизм. Он реализуется через процессоры, которые располагаются на конвейерной ленте и делят ее на несколько лент. 

В процессор с конвейерной ленты поступает ящик, в него процессор может дописать новую бирку с информацией об обработке, изменить старую бирку или отправить ящик в один из выходов. Такие выходы называются relations. У выходов есть названия, часто выходы называются:

  • success — успешная обработка

  • failure — брак/ошибка обработки

Иногда выходов из процессора больше и названия различаются, но лучше не зацикливаться на названии, а рассматривать выходы как ветвления в результатах работы аппаратов — процессоров.

А еще процессор умеет создавать и удалять ящики. Новые появляются копированием или изменением содержимого в старых ящиках. А удаляются ящики, когда попадают на выход, который выключен.

Процессоры могут требовать или не требовать входящие FlowFile. Входящий поток для процессора называется Upstream. Процессор может быть с одним из трех вариантов запуска:

  • требуется upstream;

  • upstream не обязателен;

  • не принимает upstream.

Процессоры, которые работают без upstream, иногда называют генераторами. Такие процессоры упаковывают внешнюю информацию в FlowFile и отправляют на конвейерную ленту обработки.

Queue

Конвейерная лента, которая соединяет процессоры, называется очередь, или queue. С одной стороны, очередь является выходом из процессора, или relation, с другой — входом в процессор, или upstream. В один процессор могут входить несколько очередей — лент для процессора, такие очереди никак не будут отличаться, потому что вход у процессора всегда один. Очередь ограничена емкостью:

  • по количеству FlowFile на ленте;

  • по суммарному объему FlowFile на ленте.

Когда очередь превышает емкость по одному из показателей, процессоры, отгружающие FlowFile в эту очередь, останавливаются. Такой механизм называется обратным давлением — backpressure. Обратное давление обеспечивает ограничение FlowFile, находящихся в одновременной обработке в NiFi.

Если в одном месте возник затор и коробки не успевают проходить по ленте, то постепенно обратное давление остановит все аппараты до генератора. Процессор не заработает, если хотя бы на одной из лент, на которые направлены выходы, будет превышена емкость.

А еще у очереди есть параметр «истечение срока хранения». Представим, что есть специалист для работы с очередью — он следит за тем, сколько времени каждая из коробок находится на конвейерной ленте. Как только время пребывания коробки превышает срок хранения, специалист снимает ее с ленты и отправляет на уничтожение. По умолчанию считается, что срока хранения у очередей нет — это эквивалентно установке настройки Expiration Time в значение 0 в любой единице измерения времени.

Еще одна особенность очереди, которая отличает ее от конвейерной ленты: на конвейерной ленте объекты строго упорядочены, для очередей в NiFi можно изменить порядок следования файлов:

  • по времени попаданию в очередь — это вариант по умолчанию;

  • по возрасту FlowFile — времени, когда файл был создан.

Атрибут priority помогает предварительно выставить приоритет файла, и FlowFile в очереди будут выстраиваться по нему.

ProcessGroup

В NiFi придумали сущность ProcessGroup. Ее функция — упорядочить хаос из процессоров и очередей и упростить понимание потока данных. С помощью этой сущности наш завод делится на отдельные помещения — цеха. 

Можно создавать ProcessGroup внутри другой ProcessGroup, разделяя большой цех на несколько маленьких. Взаимодействие между цехами обеспечивают порты — окна, через которые данные попадают в цех, — ProcessGroup и уходят из него. 

Порты бывают входящие и исходящие. Они соединяются друг с другом и процессорами с помощью очередей. А еще порты связывают не только ProcessGroup одного NiFi, но и дают возможность отправлять FlowFile в другие NiFi и получать FlowFile из других NiFi. Таким образом порт становится уже шлюзом. В шлюз/порт подвозят данные с других заводов. Также шлюз может быть использован для интеграции NiFi с другими инструментами, хотя лучше эту интеграцию использовать по назначению — для взаимодействия между NiFi.

Controller Service

Некоторые процессоры имеют общие правила работы или взаимодействия с внешними системами. Например, процессор, выполняющий запросы к базе данных, взаимодействует с БД в соответствии с определенными настройками, и несколько процессоров могут использовать одни и те же настройки. Для этого в NiFi предусмотрена сущность Controller Service.

Controller Service задается один раз и переиспользуется в нескольких процессорах. Он может определяться для БД, сервисов сериализации и десериализации данных, управления учетными записями и многого другого.

Controller Service может быть определен для всего NiFi или для отдельной ProcessGroup, и тогда для других ProcessGroup такой сервис недоступен. У Controller Service есть еще одна особенность — полноценный отдельный «объект» для выполнения задач. Все процессоры, обращающиеся к нему, разделяют его ресурсы. 

Например: Controller Service взаимодействия с базой данных работает по принципу пула соединений. Значит, у сервиса определено количество соединений, которые он может использовать для базы данных. Будем считать, что у нас нагруженная база данных и из NiFi одновременно не должно быть более одного соединения. 

У двух процессоров, которые использует этот сервис, возникла необходимость обратиться к базе данных. Процессоры не будут это делать сами, а используют сервис взаимодействия с базой данных. Первый процессор успешно вызовет сервис, второй будет ждать, когда соединение освободиться, и только после этого вызов будет выполнен сервисом.

Использование Control Service с пулом коннектов
Использование Control Service с пулом коннектов

ReportingService

Кажется, у нас есть все, чтобы выпускать продукцию на нашем заводе — обрабатывать данные. Но, чтобы все работало хорошо, нужен тот, кто будет следить за тем, чтобы завод работал и на нем не возникали нештатные ситуации.

Для таких целей в NiFi есть ReportingTask. Сами по себе ReportingTask ничего не контролируют, но позволяют нам понимать, что происходит в NiFi, и реагировать на это.

 ReportingTask существуют в рамках всего инстанса NiFi, а не отдельных групп. Они сообщают, сколько данных в обработке, какие проблемы возникли, насколько утилизированы ресурсы и еще много полезной информации. 

Более подробно про использование ReportingTask и мониторинг в NiFi я опишу в отдельной статье. Механизм ReportinTask гораздо удобнее, чем создание отдельных элементов в потоке, которые существуют только для отслеживания выполнения работы потока.

Наш завод содержит все для обработки данных, которые передвигаются по конвейеру. Мы можем изменять содержимое FlowFile, сопроводительную информацию к ним в атрибутах, но в NiFi нет котлов, которые могут смешивать разные потоки с двух лент. Мы не можем соединить два ящика, которые движутся по разным лентам (join). Конечно, можно сложить несколько ящиков в один, но только в том случае, если они пришли к нам по одной ленте в один момент.

Теперь у нас есть замечательный завод по обработке данных.

Подытожим

Мы обсудили, что NiFi — это потоковый ETL-инструмент, посмотрели, как процессоры обрабатывают данные и очереди соединяют процессоры. Выяснили, что ProcessGroup объединяют процессоры и очереди, Controller Service хранит настройки и обеспечивает пулы соединений, а ReportingTask смотрит за объемами данных и утилизацией ресурсов.

А в следующей статье я постараюсь рассказать о том, как NiFi работает с ресурсами системы, как функционируют процессоры и как распараллелить нагрузку. Посмотрим, как сделать наш завод по обработке данных эффективным. 

Если есть вопросы — добро пожаловать в комментарии! Или в канал комьюнити NiFi:

Комментарии (4)


  1. dyadyaSerezha
    26.06.2023 22:45

    Такие процессоры упаковывают внешнюю информацию в FlowFile и отправляют на конвейерную ленту обработки

    А разве upstream - не внешняя информация?

    P.S. Кажется понял. Upstream это чисто внутреннее понятие. Но еще такой вопрос:

    С одной стороны, очередь является выходом из процессора, или relation

    А зачем в relation очередь если данные уничтожаются? Очередь на уничтожение?


    1. Shadilan Автор
      26.06.2023 22:45

      Да в терминах NIFI у процессора есть Upstream Connection и Relation. Upstream Connection это вход для Процессора. И получается для Процессора это внешняя информация, но в рамках NIFI это входящая очередь данных внутри NIFI.

      В процессоре строго определены возможные/используемые relation. Если какой то из Relation не нужен (то есть нам не интересно что в него направляется, и мы не планируем обрабатывать информацию попавшую в этот выход) то мы можем пометить данную очередь как "Terminated", в этом случае мы не обязаны куда то ее направлять, и данные попадающие в нее будут уничтожаться. Но тем не менее процессор все равно имеет эту выход, просто он никуда не ведет.


      1. dyadyaSerezha
        26.06.2023 22:45
        +1

        Минуточку, у меня все ходы записаны

        А удаляются ящики, когда попадают на выход, который выключен. Такие выходы называются relations. 

        То есть, вы прямо определяете закрытые выходы, где удаляются данные, как relations. Надо поправить.


        1. Shadilan Автор
          26.06.2023 22:45

          Спасибо огромное, да логическая не стыковка в тексте вышла и я ее проглядел. Сейчас поправил должно стать понятнее. Любые выходы называются relations. И можно пометить Relation как Terminated