Добрый день. Меня зовут Иван Клименко, и я архитектор департамента поддержки продаж компании Arenadata. В основном занимаюсь продуктом Arenadata Streaming (ADS). В продукт входит cервис Arenadata Streaming NiFi, который является LowCode-средством построения интеграционных потоков с возможностью масштабирования.

Сегодня я бы хотел открыть цикл статей практического применения, так называемого «HowTo…», и первой темой выбрано взаимодействие с базами данных в NiFi. Рассмотрим простые заготовки потоков обработки данных, которые извлекают данные из таблиц и помещают данные в другую, либо эту же базу. Разберем, как создавать подключение к базам, какими процессами пользуемся для выборки, а также как оптимизировать эти процессы. Эта статья будет интересна в первую очередь новичкам в NiFi, ну а от опытных разработчиков всегда жду комментариев с дополнениями или с конструктивной критикой.

О продукте Arenadata Streaming

Arenadata Streaming (ADS) — масштабируемая отказоустойчивая система для потоковой обработки данных в режиме реального времени, адаптированная для корпоративного использования и построенная на базе Apache Kafka и Apache Nifi.

Общая архитектура продукта представлена на рисунке:

Общая архитектура и состав продукта Arenadata Streaming.
Общая архитектура и состав продукта Arenadata Streaming.

В состав ADS входит Kafka и его экосистема - Kafka Rest, Kafka Streams, Kafka Connect, Kafka SQL, Schema Registry, а также NiFi и его компоненты — NiFi Registry и MiNiFi. Более подробно о продукте можно узнать по ссылке: Arenadata Sreaming.

Типовые сценарии применения ADS включают шину данных, единую точку обмена для микросервисов, систему ETL и CDC. Более подробно можно прочитать в разделе.

Стоит сказать несколько слов и оNiFi — это LowCode масштабируемое средство для построения интеграционных процессов в режиме времени, близком к реальному. NiFi подходит для ETLпроцессов, разработки интеграций, может выступать в качестве шины данных. Подробнее про NiFi можно прочитать в этом разделе и в этом разделе.

В процессе я буду применять термин FlowFile — это основной компонент обмена в NIFi, процессор — компонент, который обрабатывает FlowFile, пайплайн — совокупность процессоров и связей между ними, позволяющий передавать FlowFile от одного процессора к другому. Простыми словами все описано в это разделе.

Работаем с базами данных в NiFi

Процесс работы с базами данных в ADS.NiFi состоит из двух основных частей — создание и настройка подключения к СУБД, и выбор нужного процессора, либо процессоров, их конфигурация и связь между собой.

Процесс обмена с СУБД начинается с подключения, и для этого применяется сервис DBCPConnectionPool. Предварительно требуется скачать драйвер JDBC для необходимой СУБД, разместить драйвер в месте, доступном для сервиса NiFi. Например, поместить скачанный файл по пути «/opt/drivers», и выдать права на чтение пользователю, под которым работает NiFi. Драйвера не поставляются вместе с NiFi и требуют самостоятельной установки.

При настройке подключения необходимо заполнить следующие свойства:

  • Database Connection URL — строка соединения с базой данных. Эта строка специфична для каждого драйвера. Например, для PostgreSQL строка подключения выглядит как jdbc:postgresql://host:port/database. В этой строке можно задавать специфичные настройки подключения, которые можно найти в документации к драйверу.

  • Database Driver Class Name  имя класса драйвера, который будет загружен при инициализации подключения. Увидеть описание можно в документации к драйверу, например, для PostgreSQL имя класса org.postgresql.Driver.

  • Database Driver Location(s) — указать путь к драйверу. Тут можно указать каталог или URL, либо полный путь к файлу. Все значения можно перечислить через запятую

  • Database User — имя пользователя для подключения к базе даных.

  • Password — пароль пользователя для подключения к базе данных.

Остальные параметры требуются для обеспечения аутентификации с помощью Kerberos, а также более тонкой настройки. Единственно, хочу обратить внимание на параметр Max Total Connections. Он указывает, сколько соединений с базой данных будет максимально доступно через этот сервис на одной ноде кластера. Так как мы можем применять один сервис в нескольких процессорах доступа к данным, нужно быть уверенным, что указанного количества хватит с учетом возможности запуска параллельных потоков. Параметры User/Password можно задать через параметры контекста, в этом случае ссылка на параметр будет выглядеть как #{user} или #{password}. О параметрах и переменных я планирую рассказать в отдельной статье.

Итак, у нас создано подключение в базе. Операции с данными выполняются процессорами, которые можно разделить по типу операции, как приведено в таблице:

Операция

Процессор

Select

QueryDatabaseTable, QueryDatabaseTableRecord, GenerateTableFetch, ExecuteSQL, SQLExecuteRecord.

Update

PutSQL, PutDatabaseRecord

Insert

PutSQL, PutDatabaseRecord, PutRecord (через Sink к базе данных)

Delete

PutDatabaseRecord

Сreate

UpdateDatabaseTable

Для начала рассмотри процесс выборки данных из базы. Для этого применяются QueryDatabaseTable, QueryDatabaseTableRecord, GenerateTableFetch.

Конфигурационные параметры процессора QueryDatabaseTable

Настройки первых двух практически одинаковы (привожу самые важные):

  • Database Connection Pooling Service — выбираем настроенный ранее сервис соединения с БД.

  • Database Type — выбор типа СУБД, от которого зависят специфичные SQL конструкции.

  • Table Name — имя таблицы, из которой выполняется выгрузка данных.

  • Initial Load Strategy — исходя из выбора, будет запущена загрузка либо сначала, либо опираясь на начально заданное значение из динамических свойств.

  • Maximum‑value Columns — указав в этой колонке поле (число либо дата) можно получать инкрементальные выгрузки. При первом запуске будет вычислено максимальное значение поля, и оно будет сохранено в стейт. При последующем запуске в запрос будет добавляться условие, чтобы это поле было больше предыдущего максимального значения.

  • Max Rows Per Flow File — количество записей в результирующем файле. Если вы оставите значение 0, то вся выгрузка будет помещена в один выходной файл, что может вызвать затруднения как с формированием самого FlowFile, так и с его дальнейшей обработкой.

  • Output Batch Size — количество файлов, которые будут направлены на выход по достижению этого числа. Очень важный параметр, так как если его значение 0, то выходные файлы не будут переданы в выходную очередь, пока весь объем выборки из запроса не будет выгружен. Таким образом, в выходных FlowFile будут присутствовать атрибуты fragment.count и maxvalue.*, содержащие общее количество фрагментов и максимум в текущем фрагменте, соответственно. Но при большом объеме выборки это может привести к OutOfMemory. Если же это свойство будет отличным от 0, например 5, то по достижению количества записей, равных (Max Rows Per Flow File * 5) эти пять файлов будут выданы в выходную очередь. При этом, общее количество фрагментов будет неизвестно.

  • Normalize Table/Column Names — так как в основе записей в NiFi находится Apache Avro, то процессоры могут привести имя поля к необходимому формату, заменив неподдерживаемые символы на “_”.

Конфигурационные параметры процессора QueryDatabaseTable
Конфигурационные параметры процессора QueryDatabaseTableRecord
Конфигурационные параметры процессора QueryDatabaseTableRecord

Между ними одно отличие: QuryDatabaseTableRecord требует указания на RecordSetWriter, в то время как QueryDatabaseTable формирует результат в виде OCF Avro, где схема хранится внутри сообщения.

Настройки GenerateTableFetch также требует указания соединения, имени таблицы, списка колонок и дополнительных условий.

Параметры GenerateTableFetch
Параметры GenerateTableFetch

Отличия между этими процессорами весьма cущественное. QueryDatabasetable QueryDatabaseTableRecord в процессе работы генерируют результат запроса, в то время как GenerateTableFetch генерирует только SQL-скрипты, которые надо выполнять на других процессорах. Также первые два процессора не имеют входящего подключения, тем самым ограничивая себя в применении только на одной таблице, когда GenerateTableFetch может извлечь требуемые данные из атрибутов входного файла.

Общим между ними является то, что они умеют запоминать значение Maximum-values Columns, и хранить его в стейте. Тем самым реализуется инкрементальная выборка, при последующем подключении будет извлечены данные либо сгенерирован запрос только на новую порцию, где значения Maximum-values Columns будут превышать сохраненные.

Также есть два процессора, умеющие извлекать данные по сформированному запросу: ExecuteSQL и ExecuteSQLRecord. Как понятно из имени процессора, второй требует указания RecordSetWriter, то есть формата хранения результирующих записей. Результат запроса ExecuteSQL хранится в формате Avro, где схема передается в теле сообщения (OCF).

Также, как и у QueryDatabaseTable, у этих процессоров есть свойства Max Rows Per Flow File и Output Batch Size, определяющие количество записей в результирующем файле и количество файлов, отправляемых в результирующую очередь при работе процессора.

Самым важным является возможность этих процессоров читать SQL-запрос из атрибутов, либо из контента. Если не задать свойство SQL select query, то тест запроса будет взят из контента. Вместе с GenerateTableFetch эти процессоры представляют более гибкий вариант QueryDatabaseTable и QueryDatabaseRecord, потому что могут выполнять запросы к разным таблицам, а не к одной.

Также процессоры ExecuteSQL и ExecuteSQLRecord могут не иметь входящего подключения. В этом случае необходимо задать запрос, который будет периодически выполняться.

Теперь рассмотрим процессы изменения содержимого таблицы базы данных.

Есть процессор PutSql, предназначенный выполнять команды UPDATE или INSERT на целевой базе данных. Особенностью этого процессора является то, что он ожидает в контенте сформированный запрос, а все типы и значения передаются в атрибутах вида sql.args.N.type и sql.args.N.value. Сразу становится ясно, что таким образом можно вставить небольшое количество записей, например, какое-то сообщение бизнес-логики. Если оперировать с его помощью с большими объемами данных, то все значения будут хранится в атрибутах, что потребует большого количества оперативной памяти. Также он требует предварительной подготовки запроса. Обычно для этого применяется процессор ConvertJsonToSQL. Пайплан может выглядеть следующим образом:

Поток, выгружающий данные из базы, конвертирующий результаты запроса в атрибуты и выполняющий операцию с помощью PutSQL
Поток, выгружающий данные из базы, конвертирующий результаты запроса в атрибуты и выполняющий операцию с помощью PutSQL

Я не рекомендую применять этот подход в процессах обработки большого количества данных, однако он может послужить для выполнения какого-то кастомного запроса с индивидуальной записью.

Для оперирования с большим количеством записей выгоднее применять процессор PuttDatabaseRecord. Как ясно из названия, процессор работает с записями и требует указания контроллера чтения формата записей. Процессор подключается к указанной базе данных и извлекает схему данных для целевой таблицы. Далее он может сопоставить поля таблицы с полями пришедшей записи. Если в записи есть схема, то сопоставление идет также по тэгу alias.

Параметры процессора PutDatabaseRecord
Параметры процессора PutDatabaseRecord

Основными свойствами являются:

  • StatementType — тут доступно несколько вариантов, таких как INSERT, UPDFATE, UPSERT, INSERT_IGNORE, DELETE. Также можно применять ссылку на атрибут, либо задать тип операции в одном из полей.

  • Unmatched Field Behavior — если поле не найдено в целевой таблице, то можно проигнорировать, либо прерваться.

  • Unmatched Column Behavior — действие, если в записи нет ни одного поля, соответствующего полю в таблице.

  • Update Keys — при выбранных UPDATE, UPSERT и DELETE требуется перечислить имена полей, которые являются ключами.

  • Maximum Batch Size — такое количество записей будет формироваться в батч для вставки. Если применять 0, то батч будет сформирован из всей записи.

Процессор поддерживает синтаксис для Oracle, MS SQL Server, SQL, PostgreSQL, Phoenix. Для остальных баз есть тип Generic, применяющий базовый SQL без особенностей синтаксиса.

Приведем примеры пайплайнов с рассмотренными процессорами.

Извлечение даных из одной таблицы, вставка (обновление/удаление) данных в другую таблицу
Извлечение даных из одной таблицы, вставка (обновление/удаление) данных в другую таблицу

Как я говорил ранее, процессоры QueryDatabaseTable и QueryDatabaseTableRecord не имеют входящего соединения и могут быть применены только для выборки из конкретной таблицы. Тем не менее их удобно применять для проверки гипотез, быстрой выгрузки анализируемой таблицы.

Пайплайн с применением PutSQL. Требуется предварительное преобразование с помощью ConvertJSONToSQL
Пайплайн с применением PutSQL. Требуется предварительное преобразование с помощью ConvertJSONToSQL

Такой пайплайн тоже подойдет для проверки гипотез, либо для работы с бизнес-логикой, кастомными запросами. Не рекомендую применять его на больших потоках даных, так как после ConvertJSONToSQL все данные будут помещены в атрибуты, а следовательно, в оперативную память.

Пайплайн, где инициализация происходит через GenerateFlowFile, запросы выборки генерируются через GenerateTableFetch, выборка через ExecuteSQLRecord и вставка через PutDatabaseRecord.
Пайплайн, где инициализация происходит через GenerateFlowFile, запросы выборки генерируются через GenerateTableFetch, выборка через ExecuteSQLRecord и вставка через PutDatabaseRecord.

Этот пайплайн является самым предпочтительным при работе с базами. Так как GenerateTableFetch может читать атрибуты, то и генерировать запросы он может для разных таблиц. Соответственно, если в GenerateFlowFile задать атрибутами исходную и целевую таблицу, то можно получить процесс, который будет работать для всех таблиц одинаково.

Немного практики

Имеем таблицу sourcetable в базе:

CREATE TABLE sorcetable(
id int, value text);

В таблицу внесем даные:

id

value

0

data0

1

data1

Настроим для QueryDatabaseTable и QueryDatabaseTableRecord параметр Maximum Value Colums = id. На выходе получим выборку, содержащую все строки в таблице.

[{“id” :0, “value”:”data0”}, 
{{“id” :1, “value”:”data1”}]

Применим такие-же настройки в GenerateTableFetch. На выходе получим текст запроса:

select * from sourcetable where id <= 1

После выполнения запроса в ExecuteSQL получим: 

[{“id” :0, “value”:”data0”}, 
{{“id” :1, “value”:”data1”}]

Как видно, на первом этапе результаты выборки одинаковы для разных процессоров. Создадим целевую таблицу:

CREATE TABLE desttable(
id int, value text)

Выберем Statement Type = INSERT и подадим полученную выгрузку на вход процессора PutDatabaseRecord. В целевую таблицу будут внесены записи:

id

value

0

data0

1

data1

Теперь добавим запись в таблицу:

insert into sourcetable values (2, data2)

Процессоры QueryDataBaseTable и QueryDataBaseTableRecord вернут одну запись:

[{“id” :2, “value”:”data2”}]

После запуска GenerateTableFetch получим текст запроса:

select * from sourcetable where id >  1 and id <= 2

Подадим полученную выгрузку в PutDatabaseRecord, и получим в целевой таблице следующий набор данных:

id

value

0

data0

1

data1

2

data2

Теперь изменим исходную таблицу:

delete from sourcetable;
insert into sorcetable values (0,’newdata0’, 3,’newdata3’);

Теперь, если применим опять процессоры QueryDatabaseTable, QueryDatabaseTableRecord или GenerateTableFetch в результате выгрузки получим только одну запись, где id=3:

[{"id":3, "value":"newdata3"}]

То есть, мы получили инкрементальную выгрузку данных, где

Выгрузим все записи через процессор ExecuteSQLRecord, задав запрос:

select * from souorcetable

В процессоре PutDatabaseTable выставим Statement Type = Update, и зададим ключ записи – id. В целевой таблице получим набор записей:

id

value

0

newdata0

1

data1

2

data2

Теперь поговорим о том, как еще больше сделать наш процесс гибким. Для этого можно применять сервис DBCPConnectionPoolLookup. Предварительно, потребуется создать подключения к интересующим базам данных. Далее в настройках сервиса нужно перечислить необходимые подключения, присвоив им символьное имя.

Пример сопоставлений имени подключения и DBCPConnectionPool в DBCPConnectionPoolLookup.
Пример сопоставлений имени подключения и DBCPConnectionPool в DBCPConnectionPoolLookup.

Теперь в процессорах GenerateTableFetch, ExecuteSQL(Record), PutDabaseRecord вместо подключения можно применять ссылку на этот сервис. Входящий FlowFile должен содержать атрибут database.name, однозначно соответствующий одному из соединений. Таким образом, изменив атрибут, можно изменить применяемое подключение к базе данных, то есть строить универсальные потоки, умеющие работать с набором баз данных по заданным правилам.

Про Create table

Я говорил, что для функции CREATE есть отдельный процессор – UpdateDatabaseTable. Его основное назначение – проверить наличие таблицы в базе данных, и если она отсутствует, то создать ее. При этом таблица будет создана по схеме записи, которая приходит в процессор.

Конфигурация процессора UpdateDatabaseTable
Конфигурация процессора UpdateDatabaseTable

Я считаю, что выносить процессы создания или обновления таблиц в операционный слой плохая идея, и не рекомендую применять этот процессор в продуктовых потоках, исключительно в девелоперском окружении для быстрого формирования целевых таблиц.

Эту статью и приведенные пайплайны не стоит рассматривать как готовое руководство к действию. Я хотел показать, что в NiFi результат может быть достигнут разными способами с разными затратами по ресурсам. Более глубокие знания вы получите, пройдя курсы в учебном центре, посетив наше сообщество, и, конечно, при самостоятельной практической работе.

Выводы и рекомендации

Подведем итоги:

  • Рассмотрены процессоры, позволяющие извлекать данные из баз данных, а также помещать данные в базы, выполняя операции INSERT, UPDATE, DELETE.

  • Рассмотрены примеры пайплайнов и определена область их применения.

  • Описано, как можно достигнуть универсальности пайплайна, определяя параметры подключения в атрибутах.

Рекомендации при построении пайплайнов:

  • Строить поток таким образом, чтобы он управлялся приходящим FlowFile.

  • Применять DBCPConnectionPoolLookup для создания конфигуриремых потоков.

  • Не выгружать весь набор записей одним пакетом, разбивать на порции. В своей практике я определил, что оптимальным значением является 50 тысяч строк в одном FlowFile.

  • Применять Avro при выгрузках и загрузках. Это сократит объем контента.

  • Загружать записи через PutDatabaseRecord.

Полезные ссылки

  1. https://arenadata.tech

  2. https://arenadata.tech/products/arenadata-streaming/

  3. https://nifi.apache.org/

  4. https://t.me/nifiusers

  5. https://courses.arenadata.tech

  6. https://docs.arenadata.io/ru/ADStreaming/current/concept/overview/use-cases.html 

  7. https://docs.arenadata.io/ru/ADStreaming/current/concept/architecture/nifi/nifi_overview.html 

  8. https://docs.arenadata.io/ru/ADStreaming/current/concept/data_model/nifi/processor_types.html 

Комментарии (2)


  1. vomikan
    26.09.2024 17:59
    +3

    Привет! От меня небольшие дополнения.
    1) Пути к файлам удобно делать относительные. Например: ./drivers
    2) Create можно же и в ExecuteSQL сделать
    3) Можно еще добавить про LookupRecord и LookupAttribute т.к. там искользуется SimpleDatabaseLookupService, например, который в свою очередь использует DBCPConnectionPool


    1. KlimenkoIv Автор
      26.09.2024 17:59
      +2

      Привет.

      1. Согласен полностью. Но цель была сказать, что драйвер надо расположить в месте, куда есть доступ.

      2. Можно, а также можно самим генерировать запросы. Но все-таки ExecuteSql предназначен для SELECT

      3. Тема отдельного разговора про обогощение. Будет отдельная статья