Добрый день. Меня зовут Иван Клименко, и я архитектор департамента поддержки продаж компании Arenadata. В основном занимаюсь продуктом Arenadata Streaming (ADS) — это масштабируемая отказоустойчивая система для потоковой обработки данных в режиме реального времени, адаптированная для корпоративного использования и построенная на базе Apache Kafka и Apache NiFi. Далее я буду говорить о NiFi, подразумевая под этим Apache NiFi, или cборку Arenadata Streaming NiFi.

Продолжаем цикл статей практического применения, так называемого "How To…". Первая статья была посвящена взаимодействию с базами данных. В связи с уходом с рынка западных вендоров повышается интерес к открытому программному обеспечению. В сообществе NiFi часто приходят с вопросами: «Где почитать о NiFi», «Дайте хорошие курсы» и т. д. Поэтому эта статья больше теоретическая. Я хотел бы поговорить о концепции потокового программирования, о FlowFile и вообще о подходе к разработке потоков обработки данных в NiFi. Эта статья будет интересна в первую очередь новичкам в NiFi, ну а от опытных разработчиков всегда жду комментариев с дополнениями или с конструктивной критикой.
Когда первый раз открываешь интерфейс сервиса NiFi и видишь перед собой канву либо уже готовый визуальный pipeline обработки данных, возникает вопрос: как именно это работает?

Чтобы процессоры начали обрабатывать данные, они должны появиться в очереди. Очередь, или соединение, — место хранения данных в NiFi, участвующих в обработке. А сами данные называются FlowFile. Эти объекты движутся сквозь систему и представляют собой набор пар строк «ключ — значение», а также связанный с ними контент из нуля или набора байт. FlowFile — основной субъект в парадигме программирования, основанного на потоках. В общем смысле Flow Based Programming (FSB) — это парадигма программирования, которая определяет приложения как сеть чёрных ящиков (процессов). Эти чёрные ящики обмениваются данными (IP — Information Packets) по предопределённым соединениям посредством передачи сообщений, где соединения (Bounded Buffer) указываются извне для процессов. Эти процессы чёрного ящика можно бесконечно связывать повторно для формирования различных приложений без необходимости внутреннего изменения. Как раз информационным пакетом для NiFi и является FlowFile, который содержит всю необходимую информацию для своей же обработки.
В общем смысле FSB — это ориентированный граф, в вершине которого выполняется вычисление данных либо их обработка, а по рёбрам графа эти данные перемещаются. В этой парадигме применяются такие термины, как Ports, Bounded Buffer, и в NiFi им сопоставляются входные и выходные порты, а также связи или очереди.

В парадигме FSB вся информация для обработки заключается в информационном пакете. Процессы не знают ничего ни о предыдущем, ни о следующем шаге. Есть только данные во входной очереди и алгоритм их обработки. Это в полной мере справедливо для и NiFi. Обычно сравнивают потоки данных в NiFi с конвейерной лентой, по которой едут ящики либо детали.
Очень хорошо это описано в статье Бронислава Житникова. Самое важное, что надо запомнить при проектировании потоков обработки: вы не сможете из одного процессора запросить данные у другого процессора напрямую либо изменить какую-то переменную, которую можно применять в других функциях. То, что можно сделать на пространстве ваших функций и классов в классическом программировании, не будет работать в NiFi. Информация, достаточная и необходимая для обработки данных процессором, должна содержаться в атрибутах либо контенте FlowFile.
Атрибуты
Рассмотрим более подробно атрибуты и то, как с ними можно работать. Всё описано в документации, поэтому подробно по каждому пункту проходить не буду, только самое основное.
Атрибуты FlowFile представляют собой коллекцию пар «ключ — значение», который отражают метаинформацию или метаданные про этот FlowFile. Мы можем узнать идентификатор FlowFile, получить ссылку на его контент, узнать количество записей, содержащихся в контенте, и так далее. Большинство операций по маршрутизации завязано именно на атрибуты. Также многие процессоры читают атрибуты в ходе своей работы либо пишут в атрибуты результаты обработки. Важно понимать, что атрибуты — это не сами данные, это именно метаинформация, позволяющая обрабатывать данные. Отмечу, что атрибуты хранятся в оперативной памяти, и чем больше у вас атрибутов, тем больше памяти понадобится для обработки ваших данных.
У каждого FlowFile есть обязательные атрибуты (Core Attributes), которые генерируются системой при создании FlowFile, прохождении его через различные отношения (очереди).
filename
— имя файла, которое может быть применено для хранения данных локально либо удалённо.path
— директория, которая может быть применена для хранения данных.uuid
— уникальный идентификатор, однозначно указывающий на этот FlowFile.entryDate
— дата и время создания FlowFile в миллисекундах с 1 января 1970, в формате UTC.lineageStartDate
— время появления в системе самого старого предка текущего файла, в миллисекундах, в формате UTC.fileSize
— количество байт, составляющих контент. This attribute represents the number of bytes taken up by the FlowFile’s Content.
Такие атрибуты, как uuid
, entryDate
, lineageStartDate
и fileSize
, генерируются системой и не могут быть изменены в ходе обработки пользовательской функцией.
В NiFi есть ряд процессоров, предназначенных для работы с атрибутами, перечисленные в разделе Attribute Extraction:
EvaluateJsonPath.
С помощью выражений JSONPath можно выполнить обмен данными между атрибутами и контентом.EvaluateXPath.
Аналогично предыдущему, только применяется XPath.EvaluateXQuery
. Аналогично предыдущему, только применяется запрос XQuery.ExtractText.
Позволяет извлечь в атрибут текст по регулярному выражению.HashAttribute.
Позволяет применять функции хеширования над списком атрибутов и помещать результат в определённый атрибут.HashConten.
Применяет функцию хеширования над контентом и помещает результат в атрибут.IdentifyMimeType.
Определяет тип контента и помещает значение в атрибут.UpdateAttribute.
Процессор позволяет изменять либо создавать атрибуты, при этом возможно применять специальный язык Expression Language
Про Expression Language (EL) в NiFi нужно поговорить чуть подробнее. Это очень мощный инструмент работы с метаданными. Каждое выражение в EL начинается со знака $
и обрамляется фигурными скобками — ${выражение}
. В самом выражении прописываем атрибут и функцию над ним. Например, проверим, что в атрибуте, содержащем имя файла, такое же значение, как и в идентификаторе FlowFile. Для этого прочитаем атрибут filename
и сравним его с атрибутом uuid — ${filename:equals(${uuid})}
. Функция отделена от аргумента знаком «:», и таких функций может быть много. Например, выражение ${filename:startWith(“some_value”):not()}
возвращает true
, если имя файла не начинается со строки «some_value».
В выражениях можно применять атрибуты, переменные, системные переменные. Во время выполнения происходит поиск этих данных в следующем порядке:
Атрибуты FlowFile.
Переменные в процессной группе.
Переменные из файла, определённые в свойстве
nifi.variable.registry.properties
.NiFi JVM свойства.
Системные переменные.
EL поддерживает работу с несколькими типами данных: String, Number, Decimal, Date, Boolean; после выполнения выражения значение приводится к строке. Поддерживаются логические операции, проверки на существование, работа с JSON, преобразование типов, математические операции и так далее.
Например, при работе потоков обработки возникает ситуация, что запись не может быть помещена в базу данных (БД) из-за превышения размера одного из полей. При этом такая запись может быть одна в наборе, и чтобы её отыскать, потребуется разбить весь набор записей. Чтобы минимизировать затраты на поиск, можно разделять набор записей пополам за один заход. Если запись одна, то она будет в одном из двух получившихся фрагментов, а корректный фрагмент будет внесён в БД. Таким образом, повторяя операцию деления, можно получить проблемную запись, а все корректные будут внесены в БД. Остаётся определить, как отделить отдельную запись и как составить условие для деления набора записей. Для этого можно воспользоваться атрибутом record.count
:
${record.count:equals(1)}
— условие для определения, что запись в наборе одна, и если она попала в ошибочную ветку, то её нужно проанализировать в ручном режиме.${record.count:divide(2)}
— определяем количество записей, на которое будем делить набор.
Пайплайн, выполняющий такую работу можно представить как:

И конфигурации процессоров, соответственно:

Таким образом, с помощью метаданных автоматизировано выявления проблемной записи в наборе, при этом оптимизирован процесс внесения в БД.
Контент
При решении задачи определения некорректной записи весь набор записей был поделён пополам. Этот набор содержался в контенте — последовательности байт, в котором находятся сами данные FlowFile. Они хранятся на диске и подгружаются только при необходимости операций над ними. За счёт того, что контент никуда не перемещается, а хранится в виде ссылки на фрагмент репозитория в атрибутах, достигается высокая производительность NiFi. Также в NiFi реализован механизм однократной записи: при модификации содержимого контент полностью переписывается на новое место в репозитории. Останавливаться подробно на механизме хранения я не буду, он хорошо описан в статье Бронислава и в документации.
В NiFi большое количество процессоров предназначено для обработки данных. Это группы процессоров трансформации данных, анализа контента и маршрутизации, получения данных из внешних систем и отправки данных во внешнюю систему. Подробно с полным списком можно ознакомиться в документации.
Различают два подхода работы с контентом: обработка всего контента целиком и абстракция контента в виде набора записей. Запись — это экземпляр интерфейса Record, и вся работа с таким контентом выполняется через специальные абстракции — классы чтения записей и классы записи записей.
Например, вы можете загрузить текстовый файл с диска и выполнить в нём операцию поиска с помощью регулярного выражения. В этом случае контент будет обрабатываться как единый элемент. Но если в тестовом виде вы загрузите CSV, то с помощью CVSReader вы можете читать каждую строку как запись.
В ходе модификации контента может возникнуть ситуация, когда от обработки записей нужно перейти к обработке контента целиком. В этом случае следует учесть, что при такой обработке весь объём обрабатываемых данных будет загружен в оперативную память. Если же контент будет разбит на мелкие фрагменты, например по одной записи, то это может породить большое количество FlowFile в очереди. А так как каждый файл будет содержать набор атрибутов оригинального файла, то эта операция также вызовет увеличение задействования оперативной памяти. Таким образом, рекомендуется переходить в процессах обработки на работу с записями.
Различают следующие основные группы процессоров, обрабатывающие контент целиком:
Dara Ingestion — загрузка данных в NiFi из внешних систем. Тут можно указать такие процессоры, как GetFile, GetFPT, GetHTTP, GetHDFS, GetKafka, GetMongo и так далее. То есть это группа процессоров, позволяющая данные из источника загрузить в NiFi в том виде, как они представлены в источнике.
Data Egress — выгрузка данных во внешнюю систему. К этой категории можно отнести процессоры PutFile, PutFTP, PutSQL, PutKafka, PutMongo. Процессор выгружает контент с минимальными модификациями. Например, PutEmail помещает контент либо во вложение, либо представляет его как тело письма.
Data Transformation — процессоры, преобразующие контент по заданным алгоритмам. Например, CompresssContent, TransformXML, JoltTransformJson.
Database Access — процессоры, предназначенные для доступа к базам данных, например ConvertJSONToSQL, PutSQL, SelectHiveSQL, PutHiveQL.
System Interaction — взаимодействие с системой, например ExecuteProcess и ExecuteStreamCommand. Процессоры позволяют передать контент во внешние процессы, запускаемые в операционной системе.
Spliting and Aggregation — разделение и соединение. Такие процессоры, как SplitText, SlitJson, SplitXml, SplitContent, выполняют операцию разделения контента, а MergeContent выполняет операцию соединения контента из нескольких источников.
Все эти процессоры работают с контентом как с одним целым, и при этом контент полностью помещается в оперативную память.
Чтобы работать с контентом эффективно, введён подход представления контента в виде записей. Структура записи/Record в NiFi описывается с помощью спецификации Apache Avro. Для чтения применяется специальная абстракция Record Reader. Такие абстракции используются для анализа и преобразования необработанных данных в структурированный формат, который может обрабатываться процессорами NiFi. Они ключевой компонент возможностей NiFi для работы с записями, что позволяет эффективно обрабатывать структурированные форматы данных. Для записи применяется RecordSetWriters — которые преобразовывают структурированные записи (обработанных NiFi) обратно в формат необработанных данных для хранения или передачи в другие системы.
Для погружения в Record-OrientedNiFi рекомендую статью Mark Payne
В чём основное преимущество работы с записями? Когда NiFi выполняет чтение данных, он не читает весь контент сразу, а "бежит" по нему и полученные данные преобразует в записи, с которыми уже работает процессор. И процессор не держит весь контент в памяти, а работает со структурированным представлением. Аналогично происходит процесс записи контента, кода Record пишутся одна за другой, требуя оперативную память только для буфера данных.
Как работают Record Readers в NiFi:
1. Record Readers принимают необработанные данные (например, файлы, потоки или сообщения) в качестве входных данных. Эти данные могут быть в различных форматах, таких как CSV, JSON, Avro, XML и т. д.
2. Record Readers требуется схема (формат Avro) для интерпретации данных. Схема определяет структуру данных, включая имена полей, типы данных и отношения. Схемы могут быть явно определены, например, заданы как свойство, заданы в атрибуте, определены в SchemaRegistry. Или схемы могут быть автоматически сгенерированы на основе данных. В этом случае применяется первая запись для анализа, и на её основе создается схема.
3. Record Reader анализирует необработанные данные в соответствии с указанным форматом и схемой. Например, CSV Reader разделит данные на поля на основе разделителей и сопоставит их со схемой, JSON Reader проанализирует JSON-объекты и извлечёт поля в соответствии со схемой, а Avro Reader десериализует двоичные данные Avro с использованием предоставленной схемы Avro.
4. Проанализированные данные преобразуются в объекты Record (записи). Каждая запись представляет собой одну строку или запись в данных. Записи представляют собой пары «ключ — значение», где ключи — это имена полей, определённые в схеме, а значения — соответствующие данные.
5. Record Reader передаёт записи процессору NiFi для обработки, например QueryRecord, UpdateRecord или PutDatabaseRecord.
NiFi предоставляет несколько встроенных Record Readers для различных форматов данных:
CSVReader. Анализирует CSV-файлы.
JSONReader. Анализирует JSON-данные.
AvroReader. Читает файлы Avro.
ParquetReader. Читает файлы Parquet.
XMLReader. Анализирует XML-данные.
SyslogReader. Анализирует сообщения syslog.
Как RecordSetWriters работают в NiFi:
RecordSetWriters принимают структурированные объекты Record в качестве входных данных — результата работы процессоров NiFi, например QueryRecord, UpdateRecord и т. д.
Как и Record Reader, RecordSetWriters применяют схему для определения, как сериализовать записи в желаемый формат вывода. Схемы определяются таким же образом, как и для Record Reader.
RecordSetWriter сериализует записи в указанный формат вывода (например, CSV, JSON, Avro, Parquet и т. д.). Так, CSV Writer преобразует записи в разделённые запятыми значения, при этом каждое поле сопоставляется со столбцом, JSON Writer сериализует записи в объекты или массивы JSON, а Avro Writer сериализует записи в двоичный формат Avro, используя предоставленную схему Avro.
Сериализованные данные записываются в выходное место назначения, например файл, сообщение или поток. Затем эти выходные данные могут использоваться нижестоящими системами или сохраняться для дальнейшей обработки.
NiFi предоставляет несколько встроенных RecordSetWriters для различных форматов данных:
CSVRecordSetWriter. Преобразует записи в формат CSV.
JSONRecordSetWriter. Преобразует записи в формат JSON.
AvroRecordSetWriter. Преобразует записи в формат Avro.
ParquetRecordSetWriter. Преобразует записи в формат Parquet.
XMLRecordSetWriter. Преобразует записи в формат XML в Apache.
Преимущества, которые даёт подход работы с записями:
Эффективность. Record Readers и RecordSetWriters позволяют NiFi обрабатывать большие наборы данных структурированно и эффективно.
Гибкость. Поддерживают множество форматов данных и схем, что упрощает работу с различными источниками данных.
Повторное использование. После настройки Record Readers и RecordSetWiters можно повторно использовать в различных потоках данных.
Узнать, работает ли процессор с записями, легко — в его названии будет слово «Record», например SplitRecord, UpdateRecord, PublishKafkaRecord.
В заключение скажу: хотя NiFi кажется очень простым за счёт понятного интерфейса, разработчики не всегда понимают саму концепцию обработки данных в пайплайнах NiFi. Разобравшись в FSB и поняв, как работает пайплайн, вы сможете легко создавать масштабируемые и гибкие потоки обработки либо интеграции.
Всё это и многое другое вы можете узнать на учебных курсах Arenadata.
Полезные ссылки: