Всем привет!

Сегодня мы поговорим про Apache NiFi. Этот ETL-инструмент все чаще используют при загрузке данных в хранилище, правда, не всегда по назначению. Об одном из таких сценариев я рассказывал на конференции SmartData. Видео можно посмотреть на Ютубе, но я все равно рекомендую вам прочитать этот текст: здесь я собрал новые мысли и идеи. Речь пойдет об инициирующей загрузке, или перегрузке данных из источника. 

Разбираемся в терминах

Сначала договоримся о терминах. Если вы уверены в своих знаниях, можете перейти к следующей части, в которой мы освежим знания о принципах работы NiFi. 

Файл. В терминах NiFi есть сущность FlowFile. Это базовая единица информации в NiFi, объект, который движется по потоку данных. Файл состоит из двух частей:

  1. Контент(content). Это содержимое файла, данные, которые мы обрабатываем в ETL-инструменте. Контент всегда хранится на диске, но есть особенность: он хранится не один к одному с файлами. Каждый FlowFile содержит указание на физический файл на диске и смещение в этом файле. Это позволяет хранить в одном физическом файле данные нескольких FlowFile и не открывать запредельное количество курсоров в операционной системе. Это неплохая концепция, но она создает определенные проблемы.

  2. Атрибуты (attributes). Это набор метаданных, описывающих файл. Набор атрибутов представлен во внутреннем устройстве как Map (словарь). В Map ключ, и значение — это строка. Атрибуты содержатся в оперативной памяти, чтобы их можно было быстро найти при обработке. Кроме того, они записываются на диск. Это сохраняет стабильность состояния обработки. Если углубляться, то на диск они сохраняются с использованием WriteAheadLog. Но это не принципиально в моем рассказе.

Процессор. В терминах NiFi это элемент, выполняющий обработку данных. Процессор может создавать и изменять контент и менять атрибуты файла. Процессоры можно разделить на два типа.

Первый — работающие без upstream (входящего потока данных). Их можно назвать генераторами данных. Такие процессоры могут не только создавать контент, но и получать его извне. Для них критично определение периода запуска: если поставить значение по умолчанию 0 секунд, они будут выполнять постоянные опросы источника. Это может привести к негативным последствиям.

Второй — не работающие без upstream. Эти процессоры получают данные из поступающего на вход файла и модифицируют контент или атрибуты. Они запускаются только при наличии файлов во входящей очереди. Расписание регулирует для них только максимальную частоту запусков при наличии файлов, и зачастую для них нормально выставлять 0 секунд. Для более тонкой настройки потока можно выравнивать движение по потоку за счет периода запуска.

Группа процессоров. Для удобства процессоры можно объединять в группы. Например, процессоры, которые выполняют общую задачу. Частный случай группы процессоров — поток (Flow).

Поток (процесс, Flow). Процесс загрузки, связанный общей бизнес-логикой.

Очередь. Элемент, связывающий процессоры между собой и хранящий файлы. Файл в очереди имеет несколько состояний:

  1. Ожидает обработки. Этот файл ожидает, когда процессор возьмет его в работу.

  2. Обрабатывается. Файл находится в очереди, но помечен как обрабатываемый. Если что-то пойдет не так, например внезапно отключится NiFi из-за питания или критической ошибки системы либо любой другой исключительной ситуации, файл будет взят в работу после восстановления.

  3. Пенализирован. Этот или другой процессор уже обработал файл, но с ошибкой. Теперь он снова в очереди. Это позволяет не повторять обработку слишком часто, когда возникают ошибки. Не все процессоры умеют проставлять п «Ппенализирован».

Что такое NiFi

Давайте вернемся к основам NiFi. Это нужно, чтобы лучше понять проблематику следующих разделов. 

Итак, NiFi использует концепцию потока файлов. Управление работой находится на стороне файлов, а не обработчиков. Как я отметил выше, обработчики запускаются, только если есть что обрабатывать (без учета генераторов). Сигналом к запуску служит наличие файлов и расписание. NiFi позволяет размазать большой объем данных в равномерно работающий поток. 

Разрабатывая ETL на NiFi, мы всегда создаем поток, состоящий из трех условных типов элементов.

Типичный ETL-поток
Типичный ETL-поток

Элемент получения — один или несколько процессоров, описывающие логику обращения к источнику и получения данных. Такие процессоры или их группы обычно начинаются с генератора.

Элемент преобразования — это, как правило, несколько процессоров, которые получают данные из источника в любом виде и придают им красивую упорядоченную форму. Эти процессоры всегда работают с входящей очередью.

Элемент сохранения — это обычно один элемент для каждого получателя. Он обеспечивает доставку данных в систему хранения или целевую систему.

В NiFi все это очень удобно раскладывается в визуальном интерфейсе в узлы со связями. Это снижает порог входа при работе с инструментом. NiFi — инструмент open source, и, если узлов не хватает, можно создать свои. Про это можно послушать в докладе Ильи Ковалева.

Как работает инициирующая загрузка

Кажется, я рассказал про NiFi все, что хотел, и могу перейти к основной теме. NiFi отлично справляется, когда нужно грузить данные потоком, часто и в условно небольших количествах. В обычной ситуации при регламентной загрузке это работает. У нас идет поток данных, которые мы забираем из источника. Но источник, как правило, появляется до создания ETL-потока, и к моменту начала загрузки там скапливается много данных. И тут возникает понятие инициирующей загрузки. 

Инициирующая загрузка приводит данные нашего хранилища в состояние, эквивалентное системе-источнику. Такая загрузка оперирует большим объемом данных, что удобно для систем обработки с помощью Batch. А NiFi — это система потоковой обработки, или обработки небольшими порциями (micro batch). 

Может показаться, что для этой задачи подойдет другой инструмент. Но это значит, что одну и ту же логику мы реализуем дважды, а тратить время на разработку двух потоков данных мы не хотим. Можно попытаться реализовать загрузку так, чтобы данные проходили по одному и тому же потоку и для инициирующей загрузки, и для регламентной. С учетом того, что профиль данных разный. Правда, в таком случае мы столкнемся с проблемами. Разберем каждую из них. 

Проблема первая: хранение контента

Как я уже говорил, NiFi хранит в одном физическом файле множество данных о контенте разных FlowFile. В настройках NiFi можно увидеть минимальный размер файла на диске.

Организация хранения в Content Repository
Организация хранения в Content Repository

Сделано это ради экономии открытых курсоров. На практике это работает так: когда появляется файл с контентом, он записывается на диск. Если размер файла не достиг границы, при появлении следующего файла контент добавится в тот же физический файл. При этом NiFi может решать, когда можно удалить физический файл, содержащий контент, который относится к множеству разных файлов в NiFi. Для этого в NiFi есть специальный механизм. Когда для файла на диске нет ни одной ссылки в FlowFile, это значит, что все они покинули поток и файл можно удалить. Это замечательно работает, если файлы быстро уходят из потока. 

В случае с инициирующей загрузкой у нас обычно есть много данных, а это потенциально много файлов. И все они будут занимать место на диске, пока их не обработают. Но даже если файл обработали, из-за этого механизма место на диске будет оставаться занятым. Поэтому оно может закончиться, даже если в потоке будет существенно меньше данных, чем объем диска для хранения контента. 

Проектируя поток для NiFi впервые, легко допустить ошибку и загрузить в NiFi сразу большой объем данных. Лучше ориентироваться на размер контент-репозитория, но не забывать о том, как эти данные обрабатываются.

На практике проще минимизировать объем данных, которые можно обработать в единицу времени, и стараться держать в потоке только те данные, которые мы можем быстро обработать. Освобождать от них поток лучше максимально быстро. Мы в своих потоках инициирующей загрузки устанавливаем размер всех очередей до самого нагруженного процессора в значении 1. Так в потоке одновременно находится небольшое количество файлов.

В потоке мы храним только нужный контент, а обрабатывать его стараемся как можно быстрее.

Есть еще один способ, но применять его нужно аккуратно. Если выставить настройку nifi.content.claim.max.flow.files в значении 1, в один физический файл будет записываться строго один content. Тут важно помнить о количестве открытых курсоров в операционной системе. Постарайтесь, чтобы в потоке было небольшое количество файлов. Иначе возникнет еще одна проблема, о которой я расскажу ниже.

Проблема вторая: изменение контента

Как я уже говорил, NiFi оставляет файл в очереди до конца обработки. Контент, который есть у файла, тоже остается в неизменном виде до конца обработки. Все потому, что каждый раз, когда нам нужно изменить контент, мы записываем новый контент и привязываем его к файлу. 

У этого процесса есть положительный побочный эффект. Если нам надо разделить обработку на несколько очередей, например для записи в несколько разных систем хранения, контент не дублируется. Просто два файла оказываются связаны с одним и тем же контентом.

Пример записи в Content Repository
Пример записи в Content Repository

У этого подхода есть и отрицательная сторона. Когда нам надо что-то изменить в контенте, пусть даже один байт, мы должны перезаписать его полностью. Это влияет на проектирование потоков и при регламентном исполнении. 

Потоки нужно проектировать так, чтобы они были понятны и сопровождаемы, так как жить с ними вам предстоит долго. Кроме того, объем данных, одновременно проходящих через NiFi, как правило, не такой большой. Скорее всего, он равномерный и предсказуемый, поэтому возможности кластера закладываются под обработку регламентного потока. Профиль нагрузки при инициирующей загрузке ближе к взрывному, а закладывать ресурсы на обработку большого объема данных может быть затратно. 

Даже при разработке регламентного потока старайтесь минимизировать количество изменений контента, но помните о читаемости. Для инициирующего потока требования к читаемости можно проигнорировать.

Проблема третья: параллельное исполнение

Теперь разберемся, где исполнять инициирующие потоки. Как следует из предыдущей части, профиль нагрузки для инициирующих потоков отличается от профиля исполнения регламентных потоков. А возможности среды по пропускной емкости ограниченны.

К сожалению, NiFi не позволяет управлять ресурсами в разрезе потоков или процессоров. Мы не можем выделить количество ядер на поток или количество оперативной памяти. Кроме того, нельзя выделить лимит использования репозиториев для потока, а это основной ресурс. 

Если на среде, где происходит исполнение регламентных потоков, запустить инициирующий поток, NiFi сбалансирует использование ресурсов поровну между всеми имеющимися процессорами и объемами данных, чтобы общая нагрузка умещалась в возможности среды. Большую часть ресурсов придется отдать инициирующему потоку, так как он обрабатывает больший объем данных. 

Такое поведение NiFi может привести к тому, что регламентные потоки начнут задерживаться с доставкой данных. А это вызовет негатив со стороны бизнеса, который ждет свои данные и не привык к задержкам. 

Но это не самое критичное. Некоторые потоки могут работать с источниками с ограниченным сроком жизни данных, например с topic в Apache Kafka с настроенной retention policy. Если не успеть забрать данные вовремя, мы начнем терять их. Это вызовет еще больше недовольства со стороны заказчиков. 

Очень важно разделить среды исполнения регламентных потоков и инициирующих потоков. И следить, чтобы разработчики не выводили на продуктовую среду исполнения регламентных потоков поток с инициацией источника. Это может нарушить функционирование регламентных потоков.

Подход с разделением сред может помочь и в решении первой проблемы. Для разных сред мы можем указывать разные параметры, опираясь на характер нагрузки, так как указывать параметр nifi.content.claim.max.flow.files в значении 1 для регламентной среды будет ошибкой.

Проблема четвертая: резервирование ресурсов

Как стало понятно из предыдущего раздела, для исполнения инициирующей загрузки нужна отдельная среда. Давайте представим, что мы запросили отдельный кластер NiFi для регламентных потоков и отдельный кластер для инициирующей загрузки. Нам предоставили эти ресурсы, и мы построили график их использования по потокам на инициирующей среде.

Использование ресурсов в инициирующей среде
Использование ресурсов в инициирующей среде

В январе мы вывели первый поток, и он начал работать. Допустим, для загрузки нескольких лет данных ему понадобилось три месяца. В феврале наши заказчики нашли еще один источник данных и попросили прогрузить данные за пару лет. Их мы также пустили грузиться на инициирующей среде, и они параллельно с первым потоком прогрузились к марту. Новые потоки не требовали инициирующей загрузки, и следующая инициирующая загрузка понадобилась только в июле. С апреля до июня у нас были зарезервированы ресурсы, которые просто простаивали. Кроме того, мы могли бы прогрузить данные первых двух потоков быстрее, если бы выделили отдельный инициирующий кластер для каждого. 

Для инициирующих потоков хорошо подойдут виртуальные среды, выделяемые из некоего общего пула и возвращаемые туда. Развернуть NiFi не так тяжело, и на этом можно сэкономить. К тому же инициирующий поток, как правило, идет по новым данным, и бизнес готов ждать, когда появятся ресурсы на загрузку. Поэтому, даже если сейчас ресурсов нет, можно подождать, когда они появятся. Исключение — ситуация, когда у вас очень большой поток инициирующих загрузок. В этом случае фиксированная среда эффективна.

Если есть возможность, для выполнения инициирующей загрузки лучше применять арендуемые/виртуальные среды.

В дополнение хочу сказать, что инициирующая загрузка возникает не только при подключении источника. Иногда бизнес хочет повторно забрать данные из источника, дополнив обработку новым функционалом. А это снова инициирующая загрузка.

Проблема пятая: нестандартные потоки

Когда мы начинаем делать потоки в NiFi, возможности среды и вариативность подходов позволяют расслабиться, и мы начинаем проектировать потоки по принципу «я художник, я так вижу». Каждый поток — уникальное отражение идей создателя. 

Это плохо не только для инициирующей загрузки, но и для регламента. Кое-что об этом можно узнать из моего видео, но есть еще один важный момент. Если мы проектируем поток, для которого надо выполнить инициирующую загрузку по стандарту, переделать регламентный поток в инициирующий проще. 

Из идей при проектировании потоков всегда выделяем блоки «Забор данных источника» (включая работу с инкрементом), «Трансформация» и «Сохранение данных». Если эти блоки выделены, инициирующий поток отличается от регламентного только первым блоком, и создать из него инициирующий поток — дело пяти минут. Ну, максимум получаса. 

Стандартизация потока — ключ к простому созданию инициирующих потоков. Разделив поток на три части и придерживаясь этого разделения во всех создаваемых потоках, вы упростите себе жизнь. 

Как создать инициирующий поток для уже загруженных данных

Иногда в данных, которые мы загружаем, возникают ошибки. Причины могут быть разными. Например, разработчик ошибся при создании потока. Иногда ошибка возникает из-за того, что изменились требования. Например, понадобились данные по полям источника, которых изначально не было. 

В этом случае можно сделать инициирующий поток и прогрузить повторно данные с изменениями. Это хороший вариант, но он возможен, только если в источнике все еще есть данные. Это случается не всегда. Кто-то хранит данные за год, кто-то — за месяц, а есть источники, где данные хранятся всего несколько дней. Это, например, топики в Kafka с настроенной политикой retention. 

Мы для себя выработали следующий подход: все данные, поступающие к нам через Apache Nifi, сохраняются в минимально измененном виде в HDFS. 

Пример организации хранения архива
Пример организации хранения архива

Если нужно повторно загрузить данные, мы создаем инициирующий поток. Но при этом он использует в качестве источника не настоящий источник, а данные в HDFS. Так мы достигаем двух целей:

— можем загрузить данные, которых в источнике уже нет;

— не создаем нагрузку на источник по получению данных.

Если вы будете использовать этот подход, нужно учесть несколько моментов. Заказчики перегрузки должны понимать, что процесс требует много времени. Кроме того, резервируемые в архиве данные занимают много пространства на диске. Заложите бюджет на закупку этого пространства. 

И обязательно определите глубину хранения для этих данных, чтобы не хранить их вечно. И постарайтесь сообщить заказчикам, сколько стоит каждый дополнительный месяц глубины хранения для каждого набора данных. 

Заключение 

А теперь давайте еще раз перечислим, о чем нужно помнить, создавая инициирующий поток в NiFi:

  1. Стараемся держать в потоке только нужный контент.

  2. Настраиваем поток, чтобы он обрабатывал контент как можно быстрее.

  3. Минимизируем изменения контента разными процессорами. Это поможет обработать данные быстрее.

  4. Разделяем среды исполнения для регламентных и инициирующих потоков, чтобы не создать конкуренцию регламентным потокам со стороны инициирующих

  5. Если это возможно, используем арендованные среды для инициирующей загрузки.

  6. Стандартизируем потоки, чтобы из регламентного потока легко было сделать инициирующий. 

  7. Чтобы обеспечить сохранность данных и возможность повторной обработки, задумываемся о резервировании данных. Хотя бы для тех систем, которые не сохраняют свои данные долго.

Надеюсь, теперь создание инициирующей нагрузки стало для вас чем-то простым и понятным. Если у вас остались вопросы по использованию NiFi, можете задать их в сообществе русскоязычных пользователей Apache NiFi в Telegram.

Комментарии (4)


  1. KlimenkoIv
    08.04.2022 22:13

    Допустим, для загрузки нескольких лет данных ему понадобилось три месяца.

    А можно примерный объем данных, которые грузятся так долго? Как целевая система релизует обработку данных - ждет полной загрузки и ворочает весь объем, или по мере поступления пересчитывается хранилище?

    все данные, поступающие к нам через Apache Nifi, сохраняются в минимально измененном виде в HDFS

    А можно поподробнее? Кто отвечает за хранение этой информации? Как долго, в среднем, хранятся данные? Доливаются ли сюда данные при последующей инкрементальной загрузке?


    1. Shadilan Автор
      08.04.2022 23:07
      +1

      На на самом деле на практике у нас были загрузки не больше пары недель. И для таких загрузок хранилище пересчитывалось каждый день батчевыми процессами. Регламентый поток работал в параллели. Но здесь все разумеется зависит от всего цикла обработки информации. Что касается объёмов это если не ошибаюсь десятки терабайт в сжатом виде.

      Но бывали разумеется на практике и потоки которые требовали строгой последовательности погрузки данных в этом случае Регламентый поток запускался только после завершения инициализации данных. И в некоторых ситуациях даже после исполнения пересчёта хранилища на инициированных данных, но это скорее редкость,в идеале последующий пайп не должен рассчитывать на весь объем инициализированных данных.

      Про архив. Данные хранятся до 5 лет, в среднем год. Регламент доливает данные в архив по мере загрузки. За хранение отвечает у нас команда поддержки NiFi совместно с командой поддержки Систем хранения.

      Надеюсь ответил на вопросы.


      1. KlimenkoIv
        08.04.2022 23:52

        У нас гораздо меньше объемы. Сначала проводится историческа загрузка, делается пересчет, аналитик смотрит по срезу, все ли верно, все сделано так, как надо и т.д. После этого запускается инкрементальная загрузка с максимальным значением инкрементных ключей из ранее загруженных данных. Так как объемы не велики, то архив не храним, в случае необходимости перегрузка ставится в задание. Если данных до 100 млн записей, то можно грузить в любое время, разруливается приоритетами на потоки. А вот если больше, то под контролем команды NIFI (то есть меня :) ). Самое большое, что грузил - 500 млн записей, заняло 4-5 часов. Но грузил в выходные, когда DWH не нагружено запросами от аналитики.


        1. Shadilan Автор
          09.04.2022 07:17
          +1

          На самом деле чистый объем не так принципиально в вопросе инициирующей загрузки, гораздо интересней соотношение регламента к инициализации. Как правило мощность среды затачивается под погрузку регламента,и если данных в 100-1000 раз больше чем дневной икремент то могут быть негативные последствия о которых я писал, даже если речь о каких-то десятках миллионов, при учете что среда рассчитана на тысячи записей. Но да, до определённых объёмов вполне реально не выделять инициализации как отдельный процесс. Если пропускная возможность среды сильно выше текущей ёмкости потока данных.