Улучшаем читаемость схем и повышаем производительность

Всем нам (ну хорошо, большинству!) приходится иметь дело с Apache NiFi в качестве ETL-инструмента. У нас он используется для управления потоками данных в платформе ZIIoT (кому интересно, что это за зверь, можно почитать здесь). Apache NiFi — инструмент хороший, местами удобный, позволяет быстро решать задачи, в том числе и достаточно высокой сложности, не формируя для этого отдельную группу разработчиков. Однако, как правило, по ходу проекта условия этих задач меняются несколько раз, и нередко их решением на разных этапах занимаются разные люди в режиме аврала. И вот в результате мы видим НЕЧТО, изображенное на Рис.1.

Рисунок 1. Нечто
Рисунок 1. Нечто

Что ЭТО? Зачем оно? Что оно делает? Что на входе? Что на выходе? Откуда вообще сюда смотреть?! И вот чтобы такого не было, имеет смысл понимать, что делать можно и нужно, а что можно, но лучше не стоит, а что и вовсе нельзя. Об этом речь и пойдет в моих ближайших трех статьях. В этой расскажу, как повысить читаемость схем и производительность. А далее — про мониторинг и переносимость процессоров, а также немного о паттернах. Сей мой труд будет полезен в первую очередь тем, кому посчастливилось уже иметь дело с ZIIoT-платформой, но и прочие пользователи NiFi тоже могут найти что-то интересненькое. ПОЕХАЛИ!

Улучшаем читаемость схем

Читаемость и сопровождаемость схем всегда были ключевыми требованиями. Но в наше непростое время, когда часто приходится работать в режиме аврала и вникать в проект в формате «с корабля на бал», эти требования становятся еще более жесткими. Всем известная аксиома «Код читается чаще, чем пишется» в отношении (low|no)-code систем актуальна вдвойне. Следовательно, все, что помогает повысить читаемость схем, — хорошо. 

Что же мы можем сделать для того, чтобы повысить эту самую читаемость? Оооо, многое и многое. У меня набралось 9 пунктов:

1. Стандартизируйте наименования схем

Театр начинается с вешалки, а схема в NiFi – с её названия. В хорошем случае стандартизация именования схем позволит «верхнеуровнево» понимать что это, не углубляясь в исследования. В свое время мы выбрали такой вид наименования схем:
«ИМЯ ПРИЛОЖЕНИЯ|НАЗВАНИЕ ПРОЕКТА: имя группы процессоров: (ПОРТ|NONE).
На том, что это гениально, не настаиваю. Но в целом, даже не самая идеальная, но ЕДИНООБРАЗНАЯ схема именования лучше, чем полное ее отсутствие.

2. Заполните Process Group Comments

Имеет смысл также заполнить поле Process Group Comments в комментариях к группе процессоров. Помимо описания стоит указать контакты основных разработчиков и\или ссылки на постановки\техническое решение. См. Рис. 2. 

Рисунок 2. Как выглядит поле Process Group Comments
Рисунок 2. Как выглядит поле Process Group Comments

3. Сделайте понятное расположение и названия процессоров

Внутри самой группы БП прежде всего нужно вынести наверх точки входа и точки выхода. Важным процессорам требуется дать понятное название. См. Рис. 3. 

Рисунок 3. Пример понятного расположения и названия процессоров.
Рисунок 3. Пример понятного расположения и названия процессоров.

4. Применяйте цветовую индикацию

Дополнительно имеет смысл использовать цветовую индикацию. Я выделяю зеленым (#c2f0c2) цветом точки входа, синим (#4c5dbd) – выхода, желтым (#f0eec0) – отладку. См. Рис. 4. 

Рисунок 4. Пример использования цветовых индикаторов.
Рисунок 4. Пример использования цветовых индикаторов.

Можно и больше цветовых индикаторов, но превращать схему в хохлому, пожалуй, не стоит. Хорошо бы договориться об единообразном использовании цветовых схем вообще для всего проекта, но и единообразие на уровне инстанса тоже не плохо.

5. Выравнивайте расположение процессоров на схеме по вертикали и горизонтали

Следующий важный момент – выравнивание (Align). Вертикальное и горизонтальное выравнивание процессоров друг относительно друга общую читаемость повышает весьма значительно. Для примера – результат реорганизации первой схемы на Рис. 5. Не могу сказать, что стало хорошо, но, на мой взгляд, определенно – лучше. 

Рисунок 5. Как выравнивание помогает сделать схему лучше.
Рисунок 5. Как выравнивание помогает сделать схему лучше.

6. Придерживайтесь единого формата расположения элементов data flow

В идеале схема должна умещаться на один экран, чтобы читаться целиком. Объединение изолированных частей схемы в отдельные группы процессоров позволяет к этой цели приблизиться, но не добиться. Реальная жизнь, как правило, несколько сложнее «идеальных случаев», и даже хорошо сегментированная на логически независимые части задача вполне может превысить размер одного листа.

Поскольку в ширину экран как правило больше, нежели в высоту, – элементы data flow имеет смысл располагать слева направо и сверху вниз аналогично письму, но персонально мне концепция «потока» данных лучше визуализируется сверху вниз. В любом случае, вне зависимости от выбранного варианта имеет смысл придерживаться его в рамках всего инстанса. 

7. Ограничьте глубину вложенности элементов

Искусственно ограничивая размер схемы «одним экраном», легко доиграться до 5+ уровней вложенности, что сильно ухудшает управляемость и читаемость. Найти ошибку в таком случае — что иголку в стоге сена: заходим в группу процессоров и видим, что она где-то во вложенной группе. И еще во вложенной. И еще. Как глубока эта кроличья нора?! Два уровня вложенности – хорошо. Три – терпимо. Больше… Гм. Не надо так.

8. Используйте Funnel

Располагая элементы на канве, старайтесь избегать пересечения соединительных линий. Это убирает ощущение «неряшливости» схемы и повышает читаемость. В случае, когда соединений очень много, не стесняйтесь использовать элемент Funnel, в конце концов он предназначен именно для этого. См. Рис. 6.

Рисунок 6. Как использовать элемент Funnel.
Рисунок 6. Как использовать элемент Funnel.

9. Оставляйте комментарии правильно

«Идеальная» схема – сущность «самодокументированная», позволяющая разобраться в постановке\реализации без привлечения внешних сущностей. Помимо уже упомянутого поля Comments (да-да, его стоит использовать и для вложенных групп процессоров), у нас есть механизм Label, позволяющий выделять зоны для письма. На мой взгляд, это отличное место для описания деталей реализации: указать пример входных и выходных данных, описать, что именно делается и пр. А вот использование Label в качестве канвы для размещения процессоров\групп процессоров для комментирования их деятельности я считаю скорее неудобным.

Рисунок 7. Использование Label для объяснения общих частей алгоритма группы процессоров.
Рисунок 7. Использование Label для объяснения общих частей алгоритма группы процессоров.

Дело в том, что «лейбл» и «группа процессоров» никак не связаны и способны перемещаться по общей канве независимо друг от друга, а сдвинуть элемент в NiFi ну ооооочень просто: одно неловкое движение и… в лучшем случае непонятно, к чему относится комментарий, в худшем – он начинает относиться не к тому, к чему нужно. Таким образом, Label стоит использовать для объяснения общих частей алгоритма группы процессоров (см. Рис. 7), а сами группы комментировать с помощью встроенного поля «Комментарии» (см. Рис. 8). Это менее наглядно, но более надежно.

Рисунок 8. Поле Comments для заметок о группах процессоров.
Рисунок 8. Поле Comments для заметок о группах процессоров.

Разумеется, все вышеперечисленные мероприятия не сделают схему лучше, максимум – более читаемой, но и это уже неплохо. Более того, иногда, приводя в удобочитаемый вид развесистые схемы передачи данных, можно увидеть интересные закономерности и попробовать оптимизировать реализацию и\или даже сам алгоритм. Наглядность NiFi — одно из основных его преимуществ.

Повышаем производительность системы

Итак, с оформительской частью мы закончили, и даже договорились, что объективно хорошего результата еще не достигли. Что дальше? Считая, что схема data flow УЖЕ достигает поставленных целей — удобна и читаема, – попробуем перейти к «производительности». 

Обратите внимание, что «производительность» я закавычил. Надо понять, признать и простить, что NiFi — не про это, ну вот совсем. От NiFi можно добиться достаточно высоких скоростей передачи данных, но накладные расходы у него просто чудовищные. Наглядность, удобство работы, прослеживаемость потока данных, сбор статистики – да, голая производительность — нет. Совсем нет. Но кое-что сделать-таки можно. И тут тоже будет 9 советов.

1. Задавайте несколько атрибутов в одном UpdateAttribute-процессоре, где это возможно

Оборотная сторона прослеживаемости — на КАЖДЫЙ проходящий в системе flow-файл КАЖДЫЙ процессор NiFi пишет данные в так называемом Provenance Repository. Периодически данные в этом репозитории сжимаются, удаляются, создаются новые, каждый процессор ведет статистику своего использования – ну вы поняли. Каждый новый «кубик» на канве – лишние записи в Provenance Repository, лишние атрибуты в каждом проходящем flow-файле — лишние записи в статистике, лишнее примерно все. 

Если есть возможность задать НЕСКОЛЬКО атрибутов в одном UpdateAttribute-процессоре – пользуйтесь. Да, в общем случае это может негативно повлиять на читаемость схемы (хорошо, когда один элемент делает одно действие, прокомментирован и понятен, правда?), а может и не повлиять. Помните про «умещается на экран»? Серебряной пули здесь нет, но выполнять однотипные действия в одном процессоре — идея скорее хорошая. Иногда для этого имеет смысл даже искусственно изменить схему – заполнять атрибуты с помощью UpdateAttribute|EvaluateJSONPath не в тот момент, когда у нас появляются сами данные, а позже, когда у нас появляется максимально полный их набор. 

2. Старайтесь минимизировать число элементов в split-merge

Стандартный (для нас) паттерн split-work-merge ДОРОГОЙ. Разделив большой JSON на «много маленьких», мы получаем много flow-файлов, каждый из которых пишет свою статистику и-тэ-дэ. 

Точно так же, как с «циклами»
в традиционных языках программирования, имеет смысл стараться минимизировать
число элементов внутри split-merge. Например, не надо с помощью
FetchDistributedMapCache получать токен для каждой записи, которую мы планируем
сделать. В большинстве случаев правильней получить токен ОДИН раз до сплита,
тогда каждый из разделенных файлов получит атрибуты, идентичные оригиналу.
Разумеется, могут быть случаи, когда идея не сработает. Например, большой объем
обработки и малое время жизни токена могут потребовать получения токена
непосредственно перед выполнением операции. Смиритесь.

3. Осторожно используйте скрипты (а лучше вообще не используйте)

Из пунктов выше вытекает, что комплексная обработка набора данных выгодней, нежели разделение и обработка поэлементно. И вот тут возникает «Соблазн скрипта». Мол, а нельзя ли рррраз и…. Хочется сказать, что НЕЛЬЗЯ!!!111, но не буду. Можно. Но осторожно. Почему:

a. Прежде всего, NiFi – LOW-code решение. Не надо лишать его приставки LOW. Скорее всего, те, кто придет чинить\править проект после вас, не будут знать замечательный groovy, давно (в две тысячи… двенадцатом?) забыли про Python 2.7.2 и знать не знают диалект JS, используемый в nashorn’е. Вот и не надо их этому насильно учить.

b. Комплексные скрипты ОЧЕНЬ сильно снижают читаемость схемы. НЕЧТО на ЧЕМ-ТО делает ЧТО-ТО и, возможно, ЧТО-ТО ЕЩЕ. Документирование «нечта», конечно, помогает, но много ли вы этого «документирования» видели в реальной жизни?

c. Скрипты в NiFi ТРУДНО отлаживать. Если не использовать ExecuteStream, то отладка возможна только в среде NiFi, что с учетом веб-интерфейса вряд ли вам понравится.

d. У скриптов, как правило, нет обработки ошибок – все падения летят напрямую в NiFi без обработки. Нет, конечно, ее можно реализовать через try - except  + REL_FAILURE.  Но вы уверены?

e. Скрипты в NiFi не слишком быстрые. Интерпретатор поверх JAVA-машины каждый раз с обработкой входных данных без возможности запустить оптимизированные библиотеки на C… работать будет, но не быстро. В противовес встроенные средства на Pure Java вполне себе производительны. 

f. Без обработки ошибок и ограничений по поллингу скрипты иногда-таки текут по памяти. В теории проблема как-то решается, но на практике никто анализом дампов заниматься скорее всего не будет.

В общем, если есть возможность НЕ использовать скрипты – не используйте. Как правило, штатных средств достаточно для решения большинства задач. Но если надо, то помним, что «не человек для субботы, а суббота для человека», поэтому пишем скрипты не «на коленке» а как нормальную программу. Т.е. в тех случаях, где это возможно, – используйте средства, ориентированные на работу с наборами записей (Record-oriented), а не flow-файлами - это помогает.

4. Отдавайте приоритет сложным средствам 

Встроенные в NiFi сложные средства, как правило, хороши, и им стоит отдавать приоритет над простыми. Например: 

  • вместо executesql -> convertavrotojson используйте executesqlrecord с нужным RecordSetWriter;

  • вместо split -> route(onattribute|text) применяйте queryrecord; 

  • вместо split -> replace -> merge jolttransform

Это снижает количество элементов в схеме и положительно влияет на производительность. 

5. Избегайте соблазна делать «восход солнца» вручную

Да, можно создать необходимый SQL-запрос с помощью ReplaceText или сформировать JSON им же, но лучше воспользоваться автоматическими средствами для работы: attributestojson, convertjsontosql, putdatabaserecord и пр. На первый взгляд, трудозатраты в подходах примерно одинаковые, но, например, в случае изменения типа данных в БД convertjsontosql автоматически отработает это изменение, а с replacetext\updateattributes придется долго и мучительно вспоминать, что и откуда. То же и с формированием json’а – в случае с «автоматикой» и маской атрибута все, что нужно – просто добавить новый атрибут, и он попадет куда надо. А с replacetext так не получится.

6. Используйте не максимально универсальные, а наиболее подходящие инструменты

Не один executesql для чтения-записи-вызова-черта-в-ступе, а:

  • querydatabasetablerecord, если нужно просто извлечь данные из таблицы (представления);

  • putdatabaserecord для записи. 

То же самое и со скриптами: не executescript, а scriptedtransformrecord. Просто за счет меньшей универсальности придется писать меньше boilerplate-кода. В этом случае мы работаем не с flow-файлом со всеми его java-классами и пр., а просто с ЗАПИСЬЮ без необходимости реализовывать все методы класса NiFi. Универсальность хороша, но не бесплатна.

7. Используйте штатные механизмы многопоточности NiFi – concurrent tasks

«Разумная достаточность» позволяет очень значительно повысить скорость обработки, ну а не умеючи сломать можно все, что угодно. Скажем, ставить 100 одновременных соединений к API – не очень хорошая идея, а вот 10 может дать прирост скорости. 

8. Увеличьте время работы процессора

Это еще один не самый очевидный способ повысить производительность обработки очереди. По умолчанию обработка производится так: 

  • по поллингу процессор получает управление, 

  • читает нужное количество flow-файлов из очереди (для большинства, разумеется, один, для ориентированных на batch-обработку – количество, равное размеру batch, ну или сколько есть в очереди), 

  • выполняет операцию – за сколько справится, и выключается. 

Затем цикл повторяется. Разумеется, Java достаточно умна, чтобы кэшировать наиболее часто вызываемые процессоры, но… К счастью, у нас есть способ скорректировать это поведение.

Рисунок 9. Определение параметра Run Duration.
Рисунок 9. Определение параметра Run Duration.

Если определить параметр Run Duration процессора в отличное от нуля состояние (Рис. 8), то процессор будет делать свою работу в течение указанного времени, НЕ ВЫГРУЖАЯСЬ ИЗ ПАМЯТИ. Поставили 2 секунды – он 2 секунды будет в цикле читать файлы из очереди и делать свою работу, и только после этого передаст управление. Экономия на переключениях контекста очень и очень значительная. Применять метод можно, но нужно учитывать пару нюансов:

  • Трюк хорошо работает с процессорами, не зависящими от I\O (InvokeHTTP). Если такая зависимость есть, то большую часть времени процессор будет просто ждать ответа, лучше всего применять данный подход для CPU-hungry скриптов.

  • В течение всего времени своей работы процессор не возвращает результаты: молотит N-миллисекунд в памяти и молотит, и только когда таймер оттикает, записывает результат в виде flow-файлов в нужную очередь. Что произойдет, если NiFi в этот момент перезапустится (и\или нехорошим образом упадет сам процессор), думаю, понятно – мы потеряем кусочек данных. Размер этой потери зависит от того, что за процессор и сколько данных он успел «скушать». В зависимости от процессора и объема обработанных данных «скушать» он может изрядно, и до момента возврата все это будет сложено в JAVA heap. 

Так что, используем с осторожностью – но используем.

9. Удаляйте ненужные атрибуты 

В процессе обработки все атрибуты flow-файла хранятся в памяти. Соответственно, если мы загрузили из кэша в атрибут десяток-другой мегабайт, это все «размажется» по всему data flow. Не забываем, что у UpdateAttribute есть возможность не только добавлять атрибуты, но и удалять их. Иногда этой возможностью стоит воспользоваться. 

Вместо заключения 

Помните, что преждевременная оптимизация – зло. Понятно, что писать изначально сложное и медленно работающее нечто не стоит, но проводить агрессивные оптимизации с параллельным исполнением, асинхронной обработкой потоков информации, увеличением времени работы процессоров надо тогда, когда в этом реально есть потребность. Большинство оптимизаций по определению небесплатны и несут те или иные негативные моменты. Так что использовать их нужно осознанно.

To be continued…

Комментарии (14)


  1. velipre_xella
    00.00.0000 00:00
    +1

    Было бы неплохо интро сделать про иерархию объектов в пайплайне. Типа, процессор, процессорная группа ...


    1. nezhi
      00.00.0000 00:00

      Интро было бы слишком большое, лучше ссылку на введение в NiFi


  1. Wayfarer15
    00.00.0000 00:00

    Было бы не плохо допинать разрабов NiFi (Cloudera, которая предлагает NiFi как бесплатную замануху для своих cloud-based фигни) на создание групп процессов, которые можно вызывать из других процессов. Тогда одно и то же можно будет вызывать из разных мест и не тянуть все эти линии. Сейчас я это делаю с InvokeHTTP, это всё равно как гланды через *опу автогеном.


    1. Shadilan
      00.00.0000 00:00
      +2

      Посмотрите в сторону NIFI Statless и Execute Statless ну и NIFI про Конвейер, а не про Процедуры, это немного ломает.


    1. atshaman Автор
      00.00.0000 00:00
      +1

      Ну, вообще - я использую kafka'у в качестве в том числе и внутреннего транспорта. Работает. Был опыт с использованием RemoteProcessGroup "к самому себе" - и это тоже работает, но в кластерных инсталляциях мне показалось проблемно.


  1. Geckelberryfinn
    00.00.0000 00:00
    +3

    Пожалуйста, не используйте NiFi в качестве ETL и перестаньте позиционировать в качестве такого инструмента. В этой ипостаси он неэффективен. Да, он может своими процессорами извлекать данные и ижектировать и даже траснформировать немного. Но создан он для роутинга данных. Для реально нагруженных ETL пайпланов он вреден и бесполезен.


    1. KlimenkoIv
      00.00.0000 00:00

      Обоснуйте.

      Вполне удобный ETL инструмент, с возможностью собственного расширения. Довольно широкий спектр задач, которые можно решить без единой строчки кода. Есть своя ниша.


    1. Shadilan
      00.00.0000 00:00
      +2

      Хотелось бы больше деталей.

      Как по мне, замечательный инструмент для Extract Transform Load. Есть вопросы к механизму стейта (IO дисков узкое место для NIFI) Но это решаемая тема.

      А для роутинга данных кажется слегка перебор.


      1. Geckelberryfinn
        00.00.0000 00:00
        +1

        Попытаюсь ответить вам двоим: @KlimenkoIv и @Shadilan:

        В ETL задачах очень часто необходимо использовать джойны. В Nifi это делается сложно и неудобно, потому что изначально не предназначался для этого. При этом, поскольку NiFi умеет в параллельную обработку, то нужно иметь в виду, что параллельные джойны также должны быть конфигурируемы: изменять алгоритм partitionning (по ключу или хэшу, броадкаст на все ноды, распределение по модулю, и т.д.), например, чтобы избежать бесполезных перемещений (шаффлинга) части данных с ноды на ноду.

        На мой взгляд Nifi удобный инструмент, если нужно приземлять в реальном времени данные из очередей, перед этим трансформировать их, скажем из json в parquet, оповестить некий сервис о пришедших данных. Возможно, даже будет удобен для микробатчей. Максимум - для заполнения Staging Area. Дальше, когда нужно запускать тяжелые батчи, вроде создания DWH или DataMart, использовать NiFi будет уже тяжеловато.

        По поводу no-code ETL: такие действительно существуют, при этом вполне умеют в полноценную параллельную обработку. Например, монструозный динозавр IBM DataStage. Из бесплатных - Talend (не знаю, научились ли они в параллельную обработку, раньше не могли, но наверняка код Spark на нем можно сгенерировать).i.

        Ну и обработка в flow-файлах, как вы правильно отметили, не прибавляет скорости обработки. Кроме того, насколько я помню, они даже без сжатия хранятся.


        1. KlimenkoIv
          00.00.0000 00:00
          +2

          Попытаюсь вам оппонировать.

          В ETL задачах очень часто необходимо использовать джойны.

          В своей практике не встречал, где нужен JOIN. Если требуется на лету соединять несколько источников/потоков данных, это действительно не про NIFI. Например, Flink с этим прекрасно справится. В этом случае я всегда задаю один вопрос - вы точно уверены в выборе инструмента? Все остальные задачи решаемы за счет обогащения.

          На мой взгляд Nifi удобный инструмент, если нужно приземляьть в реальном времени данные из очередей, перед этим трансформировать их, скажем из json в parquet, оповестить некий сервис о пришедших данных. Возможно, даже будет удобен для микробатчей. Максимум - для заполнения Staging Area

          Вы сами говорите об области применения, где достаточно много трансформаций, параллельных процессов.

          Дальше, когда нужно запускать тяжелые батчи, вроде создания DWH или DataMart, использовать NiFi будет уже тяжеловато.

          Для этих целей существует другой ряд инструментария. Когда данные в Stage попали, область ответсвенности NIFI закончилась. Данные доставили. Вся последующая магия через внутренние преобразования в БД, которые окрестрируются, например, Airflow.

          Ну и обработка в flow-файлах, как вы правильно отметили, не прибавляет скорости обработки. Кроме того, насколько я помню, они даже без сжатия хранятся.

          Укажите кодек Avro в настройке Writer. Данные будут сжаты. Из коробки поддерживается deflate, bzip2, snappy, xz, zstandard. Т.о. контент будет занимать меньше места. Однако это влияет на обработку, т.к. для изменения записи идет их распаковка и упаковка.


    1. atshaman Автор
      00.00.0000 00:00
      +2

      Тут скорее надо говорить, что у nifi как и у любого другого инструмента есть границы применимости и классы решаемых задач в которых он эффективен. Сотни мб\сек в datalake лить - можно, но дороговато как по железу так и с т.з. "когнитивной нагрузки" на эксплуатантов.

      А вот например забрать данные по персоналу с SAP HCM по шине, сконвертить из тамошнего XML, положить в три микросервиса внарезку и дообогатить данными из AD через keycloak - можно _руками джуна_. Собственно, смысл статьи в том, чтобы смотреть на результат "народного творчества" инженеров внедрения - не "разработчиков" ни в одном месте - можно было без внутренней дрожи.

      Решение аналогичного рода задач "в рассыпуху" на том же airflow требует существенно большей квалификации.


  1. sentoz
    00.00.0000 00:00
    +1

    В целом, можно получить полезную выжимку, как оформить свои процессорные группы, что бы потом другие не ломали себе мозг))


  1. nezhi
    00.00.0000 00:00
    +1

    Интересно узнать, как поступаете с выходами с ошибками у процессоров. Когда много процессоров, то и много выходов failure, и схема с ними хуже читается.


    1. atshaman Автор
      00.00.0000 00:00
      +1

      В настоящий момент - формируем сообщение по схеме и публикуем в отдельный топик кафки через controlrate, чтобы флудинга не было. Более-менее универсальная обработка потока ошибок (Запись в лог, создание платформенных event'ов, нотификация пользователей при необходимости - вот это все) в отдельной группе процессоров. Понятно, что с ошибками, для которых нужна обработка (Откат цепочки операций, "восстановимые" ошибки и пр) этот фокус не прокатит - но в целом схемы упрощаются. Плюс - обработчик ошибок выходит переиспользуемый между проектами.