Всем доброго дня. Я Иван Клименко, и я разработчик потоков данных в компании Аскона. Данная статья посвящена параметризации NIFI-потока и информированию СУБД об окончании загрузки.
В первой части я описал начальный поток, на котором отладили основную модель заполнения stage слоя. Основные принципы: инкрементальная загрузка, формирование служебного поля - хеша для сравнения изменений, загрузка в БД батчем (применительно к MSSQL через BULK INSERT).
Недостатки потока, выяснение в процессе эксплуатации:
При росте количества источников сложность поддерживать актуальную версию возрастает.
Начальным процессором является QueryDatabasetable, который не поддерживает входящие соединения. Таким образом, этот процессор мог работать только по одному заданному расписанию.
Необходимость удаления CSV файла на сервере, откуда MSSQL может забирать данные с помощью BULK INSERT. В мем случае это локальная папка на сервере с MSSQL.
Экранирование в CSV. В некоторых источниках были поля с очень объемными блоками текста, и экранирование фактически искажало информацию. Стали поступать жалобы от аналитиков о расхождениях при сверках. Само содержание по смыслу не менялось, но автоматические тесты выявляли расхождения с источником.
Для устранения был разработан один общий поток, принимающий на вход FlowFile, в атрибутах которого содержались сведения об исходной таблице и таблице назначения:
src.schema |
Исходная схема |
src.table |
Исходная таблица/представление |
src.table.incrementkey |
Имя поля с инкрементальным ключом |
tgt.schema |
Целевая схема |
tgt.table |
Целевая таблица |
Получение имен полей
Так как теперь неизвестно, какая именно таблица будет обрабатываться, потребовалось реализовать динамическое извлечение списка имен полей в таблице, для последующего составления правила конкатенации перед расчетом хеша.
Процесс получения имен полей
Первым этапом формируется запрос к Oracle:
SELECT OWNER as TABLE_SCHEMA,TABLE_NAME as TABLE_NAME,COLUMN_NAME as COLUMN_NAME,
Concat(DATA_TYPE,DATA_LENGTH) AS DATA_TYPE FROM ALL_TAB_COLS
WHERE TABLE_NAME IN ('${src.table}') ORDER BY COLUMN_ID
В результате получаем набор записей, содержащих имена колонок. После проверки, что запрос вернул записи, переходим к извлечению данных с помощью RouteOnAttribute.
${executesql.row.count:equals(0)}
Данные преобразуются к Json извлекаются в два Jolt этапа.
Первый "Shift" преобразует имена полей в массив "data":
{"*": {"@COLUMN_NAME": "data"}}
Второй "Modify-override" формирует нужную строку:
{"datanew": "=join(',',@(1,data))"}
Далее с помощью EvaluateJsonPath колонки помещаются в атрибут ${src.table.columnnames}
Формирование и выполнение запроса
Для того, чтобы сгенерировать запрос, учитывающий наличие списка колонок и инкрементальный ключ удобно применять GenerateTableFetch. Для выполнения запроса применяется ExecuteSQLRecord. Проверка аналогично указаной ранее.
Настройки процессоров
Для процессора GenerateTableFetch требуется задать параметры подключения и указать, по какой таблице формировать запрос.
Если атрибут ${src.table.incrementkey} не задан, то при генерации запроса он игнорируется. Если же задан, то при генерации его последнее значение будет извлечено из хранилища состояний процессора и добавлено в запрос в виде блока " WHERE"
Сформированный запрос попадает на ExecSQLRecord.
Я указываю параметр Max Rows Per Flow File равным 5000. Это означает, что каждые 5000 записей, получаемых от источника NIFI будет формировать новый файл. Это позволит выгрузить большой общем данных и не упасть по памяти, так как данные будут уходит в контент. Весь общем записей по одному запросу составит батч.
Формирование служебных полей для stage слоя
На следующем этапе происходит заполнение служебных полей.
Этот этап по сути такой же, как и в первой части статьи.
Однако, не зря же извлекали список колонок...
При формировании поля, по которому будет произведён расчет хеша я применяю полученный список имен полей. Но, в исходном состоянии это список, с разделенными запятой именами. А требуется привести к RecordPath. Для этого написал выражение:
concat(${src.table.columnnames:substring(0,${src.table.columnnames:lastIndexOf(",")}):substring(${src.table.columnnames:indexOf(",")}):replace(" ",""):replace(",",",/"):substring(1)})
Код меняет сивмолы "," на "/", и загоняет полученную строку в функцию конкатенции. Т.о. в результирующем поле будет строка, с объединением всех полей.
Внесение в целевую БД
После формирования служебных полей требуется внести данные. Для этого в NIFI есть прекрасный процессор - PutDatabaseRecord. Он берет записи, и применяя JDBC-соединение формирует запрос на вставку данных.
Настройка процессоров
Для PutDatabaserecord требуется указать целевую схему и таблицу.
Следующим этапов надо выделить последний файл в батче. Это нужно для того, чтобы проинформировать целевую систему, что таблица загружена и ее можно брать в обработку. Если возникнет ситуация, в которой NIFI еще грузит данные, чтобы расчет витринного слоя не запускался, так как результирующие данные могут быть не консистентными.
Последний файл в батче легко определяется выражением:
${fragment.count:equals(${fragment.index:plus(1)})}
Выражение сравнивает индекс фрагмента с количеством фрагментов. Данные атрибуты формируются при разбиении выгрузки на файлы процессором ExecuteSQLRecord.
При обнаружении последнего файла в батче, считаем, что все данные были внесены и выполняем процедуру:
DECLARE @RC int
DECLARE @tablenameNiFi sysname
set @tablenameNiFi = '[${tgt.shema}].[${tgt.table}]'
EXECUTE @RC = #{tgt.sql.cognos.switchprocedure}
@tablenameNiFi
Процедура выполняет простую функцию - таблица из текущей схемы полностью переключается в другую партицию и другую схему. Таким образом, NIFI может опять начать заполнять текущую таблицу, а уже заполненные данные пойдут в расчеты.
Стоит отметить, что PutDatabaseRecord пробует сформировать батчевую вставку средствами драйвера, т.е. данные будут идти пачкой, а не одной записью.
Замечено, что для корректной работы батчевой вставки требуется, чтобы порядок полей в записи совпадал с порядком полей в таблице. Также для MSSQL, если заменить тип "datetime" в таблице на "datetime2", то профайлере отображается, что батчевая вставка меняется на " BULK" вставку, то есть идет с той же скоростью, что и BULK INSERT, но по сети, без промежуточного файла.
Следующие этапы являются служебными - формирование логов, информирование и т.д.
Заключение
Итак, в результате у меня получился поток, способный принимать на вход имена таблиц в источнике, самостоятельно формировать правило для расчета хеша, и вносить в целевую таблицу со скоростью, сравнимую с BULK INSERT, и информировать целевую систему о завершении загрузки батча.
Достоинства:
Поддержка потока стала гораздо проще.
Внедрение новой таблицы - создание во внешней группе процессора GenerateFlowFile с заданными атрибутами и распсианием.
Скорость внесения сопоставима с BULK INSERT.
Не обошлось и без неприятностей, которые выявились в процессе:
Появился новый источник - MySQL, а поток заточен под Oracle.
При многопоточном запуске обработки последний файл мог внестись раньше, чем все остальные. Это связано с тем, что в последнем файле обычно содержится меньше записей, чем порядок разбиения батча, и при расчете служебных полей он успевал проскочить. Партиции переключались, когда не все данные были залиты.
В некоторых случаях от источника приходили поля типа FLOAT, и они неверно оторажались в Avro, то есть либо округлялись, либо сдвигались.
О том, как я двигался дальше и к чему пришел на сегодня, я расскажу в следующей статье.