Добрый день. Меня зовут Иван Клименко, и я архитектор департамента поддержки продаж компании Arenadata. В основном занимаюсь продуктом Arenadata Streaming (ADS) — это масштабируемая отказоустойчивая система для потоковой обработки данных в режиме реального времени, адаптированная для корпоративного использования и построенная на базе Apache Kafka и Apache NiFi. В продукт входит замечательный сервис Arenadata Streaming NiFi, который является low-code средством построения интеграционных потоков с возможностью масштабирования.
Сегодня хочу показать на одном из практических случаев, что не всегда в NiFi удаётся следовать шаблонному подходу record-oriented в разработке потоков обработки и трансформации данных.
Как известно, в NiFi применяется так называемый record-oriented-подход. Что это значит? Каждый контент FlowFile представляет собой не один элемент, а множество одинаковых по структуре записей. И работать с ним можно как с массивом записей, проводя необходимые трансформации и манипуляции, например выборки, обогащения либо изменения содержимого полей. В этом случае если запись изменяется, то контент FlowFile заново записывается на диск, а в атрибутах меняется системная ссылка на файл контента. Для чтения и записи элементов применяются специальные сервисы: RecordReader и RecordSetWriter, — которые, основываясь на схемах данных, позволяют процессорам обработки оперировать записями, не вдаваясь в детали их хранения. Такой подход позволяет сократить накладные расходы, так как хранить и обращаться к единому множеству записей в одном файле с одним набором метаданных выгоднее, чем хранить каждый файл с метаданными отдельно. Стандарты обработки данных тоже сместили фокус с индивидуальной обработки на работу с записями. И одной из моих рекомендаций всегда является применение обработки записей с помощью предназначенных для этого процессоров: UpdateRecord, JoltTransformRecord, QueryRecord и так далее. Это быстрее за счёт обращения к одному файлу, а оперативная память требуется только на один набор атрибутов, а не на множество.
Но что делать, если в вашем потоке обработки всё построено на записях, но от одного источника приходят данные, которые выпадают из общей парадигмы? Я расскажу об одном случае из моей практики, когда при решении задачи наполнения хранилища пришлось провести нестандартную обработку.
Для понимания задачи опишу информационную среду, в которой я разрабатывал интеграционные процессы. Есть компания, которая производит и продаёт товары, допустим мебель. Она использует различные бизнес-сервисы, позволяющие автоматизировать задачи по обороту товаров, логистике, складскому учёту, кадровым задачам, продажам и производству. Данные для построения отчётности компания собирает в аналитическом хранилище. А сама аналитика информации нужна для комплексной оценки эффективности бизнеса, текущей работы компании и прогнозирования во всех сферах деятельности. Источниками данных для хранилища являются различные ERP-, CRM-, PIM-, MDM-системы, веб-приложения и приложения обработки данных компании. В целях оптимизации информационного обмена прямой доступ между системами заменён на обмен сообщениями через единую шину данных. Чтобы сократить объём данных, в компании введено правило: в топике Kafkaнаходятся сообщения, сформированные по одной схеме, и сами сообщения преобразованы в Avro по стандарту Confluent. Схемы сохранены в ADS. Schema Registry. Обобщённая структура информационной среды представлена на рисунке.
Также в компании принят стандарт передачи сообщения в виде блока метаданных и данных. Обязательным элементом метаданных является время генерации сообщения, остальные поля оставлены на усмотрение генерирующий системы. Таким образом, данные представляют собой структуру, разделённую на два уровня: информационный блок и непосредственно структуру от источника, например, в таком виде:
Пример передаваемых данных
{
"metadata" : {
"ts" : "2020-01-01 00:00:00.0000"
},
"data" : {
"someFields1" : "some value",
"someFieeld2" : 0,
"someFieeld2 : true
}
}
ETL-процесс наполнения хранилища выглядит максимально просто: извлечение данных из требуемых топиков, необходимые трансформации по приведению[АД1] к структуре, пригодной для внесения в таблицу базы данных, сопоставление исходного топика целевой таблице в хранилище и внесение данных в хранилище.
Следующим этапом нужно подготовить данные для внесения в таблицу. Для этого требуется преобразовать полученную иерархическую структуру в плоскую таблицу, сдвинув поля из вложенности на уровень вверх, убрать ненужные теги metadata и data. Дополнительно понадобилось добавить в структуру время формирования записи из метаданных. Стейджинг формируется таком образом, что одному топику, а следовательно, одной структуре, соответствует одна целевая таблица, при этом имя таблицы соответствует имени топика. Дальнейшая обработка данных вынесена в процедуры расчёта хранилища и не завязана на сам ETL-процесс.
При получении данных из Kafka с помощью процессора ConsumeKafkaRecord флаг "Separate By Key" выставлен в значение "false", что позволяет формировать один FlowFile для нескольких сообщений. Так как нужно просто «сдвинуть» поля на один уровень, в качестве инструмента сразу напрашивается Jolt. Для упрощения структуры данных можно применить простую Jolt-трансформацию, которая перебирает все поля второго уровня и сдвигает их на первый уровень:
Трансформация, преобразующая двухуровневую иерархию в плоскую струтуру
[
{
"operation": "shift",
"spec": {
"*": { "*": "&" }
}
}
]
Трансформация позволяет работать с одной записью, и её можно применить в процессоре JoltTransformRecord. Таким образом, получается универсальный процесс, извлекающий данные из Kafka, трансформирующий их в плоскую структуру и передающий в целевую базу данных.
Конечно, представленный поток не готов к выводу в продуктовую среду, это только его ядро, которое необходимо дополнить обработкой ошибок, сформировать повторы, добавить оповещение. Тем не менее в таком виде уже получаем универсальный поток обработки, позволяющий загружать данные из любого топика в соответствующую таблицу целевой базы. Если внести список топиков в параметры контекста и управлять ими через API, то сможем автоматизировано управлять источниками.
Бизнес-системы не являются статичными, и список источников данных для аналитических хранилищ постоянно изменяется. В один момент появилась PIM/MDM-система, генерирующая данные не в простом формате, а с гораздо более сложной иерархией.
Сложная структура данных, генерируемая PIM/MDM-системой (очень большой JSON)
{
"metadata" : {
"ts" : "2024-05-14 22:15:68.5896"
},
"data" : {
"object" : {
"guid" : "04551935-d520-5785-b257-c8ae8240711c",
"template" : "Item",
"language" : null,
"fields" : {
"itemGroup" : {
"guid" : "497b6e51-4c1d-5ce0-be02-b8f37ba35c26",
"template" : "ItemGroup",
"language" : null,
"fields" : {
"name" : "GroupNanme",
"parentObject" : {
"guid" : "32d38329-1868-5f51-bb30-f824258539fb",
"template" : "ItemGroup",
"language" : null,
"fields" : {
"name" : "parent name"
}
},
"specification" : {
"guid" : null,
"template" : "SpecificationBrickContainer",
"language" : null,
"fields" : {
"internalGroupSpecification" : {
"guid" : null,
"template" : "InternalGroupSpecificationBrick",
"language" : null,
"fields" : {
"volumeRatio" : "1.3",
"groupAttribute" : {
"guid" : "84281670-f476-5f3f-8b31-382f02e13e83",
"template" : "ItemGroup",
"language" : null,
"fields" : {
"name" : "group name",
"parentObject" : {
"guid" : "1ba54e86-bc86-59ac-93e9-34fa4a68abe3",
"template" : "Folder",
"language" : null,
"fields" : {
"name" : "another name"
}
}
}
},
"codeGroup" : "5561010305"
}
}
}
}
}
},
"additionalInformation" : {
"guid" : null,
"template" : "AdditionalInformationBrickContainer",
"language" : null,
"fields" : {
"additionalInformation" : [ "AdditionalInformationBrick" ]
}
},
"manufactureCountry" : {
"guid" : "f64ab0a6-cff1-5dcd-9f1c-cc43f1c58d03",
"template" : "Property",
"language" : null,
"fields" : {
"name" : "US"
}
},
"code" : "5896dkwskeo_kedoed896",
"parentObject" : {
"guid" : "ce5a885b-8170-5bbe-80c1-c0d2d41d0cad",
"template" : "Item",
"language" : null,
"fields" : {
"name" : "parent name",
"parentObject" : {
"guid" : "83c7b0c5-3d1c-50ff-a9f0-0b0fc6e47cbd",
"template" : "ItemTemplate",
"language" : null,
"fields" : {
"name" : "object source name"
}
},
"statusCode" : "draft"
}
},
"ean13" : "462711234164",
"measureUnit" : {
"unitName" : "item",
"unitCode" : "796"
},
"main" : {
"guid" : null,
"template" : "MainBrickContainer",
"language" : null,
"fields" : {
"localizedDescription" : [ "LocalizedDescriptionBrick" ]
}
},
"media" : {
"guid" : null,
"template" : "MediaBrickContainer",
"language" : null,
"fields" : {
"media" : [ "MediaBrick" ]
}
},
"commerce" : {
"guid" : null,
"template" : "CommerceBrickContainer",
"language" : null,
"fields" : {
"product" : {
"guid" : null,
"template" : "ProductBrick",
"language" : null,
"fields" : {
"ean13" : "4627112341690"
}
}
}
},
"pack" : [ {
"packageName" : "coverage",
"package" : null,
"length" : {
"value" : 140.0,
"measureUnit" : {
"unitName" : "sm",
"unitCode" : "004"
}
},
"width" : {
"value" : 200.0,
"measureUnit" : {
"unitName" : "sm",
"unitCode" : "004"
}
},
"height" : {
"value" : 10.0,
"measureUnit" : {
"unitName" : "sm",
"unitCode" : "004"
}
},
"volume" : {
"value" : 0.28,
"measureUnit" : {
"unitName" : "m3",
"unitCode" : "113"
}
},
"weightNetto" : {
"value" : 35.9,
"measureUnit" : {
"unitName" : "kg",
"unitCode" : "166"
}
},
"weightGross" : {
"value" : 35.9,
"measureUnit" : {
"unitName" : "kg",
"unitCode" : "166"
}
},
"seatsCount" : null,
"packagedUnitsCount" : null,
"setSeatsCount" : null,
"priority" : null,
"measureUnit" : null,
"mainLogisticsPackaging" : null,
"barcode" : null
} ],
"isMatrix" : 1,
"series" : {
"guid" : "f68d1bc3-2600-5b40-b9b1-957d039cdd6b",
"template" : "tmp",
"language" : null,
"fields" : {
"name" : "seria name",
"priceSegment" : {
"guid" : "364044ec-0f96-5a5f-9028-9dc25f752e5e",
"template" : "Property",
"language" : null,
"fields" : {
"name" : "middle"
}
},
"parentObject" : {
"guid" : "a497557f-4eda-5f16-807a-4b71eb5c1b74",
"template" : "Seria",
"language" : null,
"fields" : {
"name" : "GENERAL Seria name"
}
}
}
},
"name" : "Common Name",
"options" : {
"guid" : null,
"template" : "OptionsCollection",
"language" : null,
"fields" : {
"length" : {
"guid" : null,
"template" : "OptionsCollectionItem",
"language" : null,
"fields" : {
"propertyType" : "length",
"relation" : [ {
"guid" : "28df4e5e-5201-519a-a3a5-57d7e1838361",
"template" : "Property",
"language" : null,
"fields" : {
"name" : 200,
"options" : {
"guid" : null,
"template" : "OptionsBrickContainer",
"language" : null,
"fields" : {
"propertyMDMData" : {
"guid" : null,
"template" : "PropertyMDMDataBrick",
"language" : null,
"fields" : {
"code" : 10004
}
},
"propertySize" : {
"guid" : null,
"template" : "PropertySizeBrick",
"language" : null,
"fields" : {
"physicalQuantity" : "length",
"numericValue" : {
"value" : 200.0,
"measureUnit" : {
"unitName" : "sm",
"unitCode" : "004"
}
}
}
}
}
}
}
} ]
}
},
"width" : {
"guid" : null,
"template" : "OptionsCollectionItem",
"language" : null,
"fields" : {
"propertyType" : "width",
"relation" : [ {
"guid" : "59c08ec1-d0ff-55c4-99a4-6616f23f1469",
"template" : "Property",
"language" : null,
"fields" : {
"name" : 140,
"options" : {
"guid" : null,
"template" : "OptionsBrickContainer",
"language" : null,
"fields" : {
"propertyMDMData" : {
"guid" : null,
"template" : "PropertyMDMDataBrick",
"language" : null,
"fields" : {
"code" : 10005
}
},
"propertySize" : {
"guid" : null,
"template" : "PropertySizeBrick",
"language" : null,
"fields" : {
"physicalQuantity" : "width",
"numericValue" : {
"value" : 140.0,
"measureUnit" : {
"unitName" : "sm",
"unitCode" : "004"
}
}
}
}
}
}
}
} ]
}
},
"height" : {
"guid" : null,
"template" : "OptionsCollectionItem",
"language" : null,
"fields" : {
"propertyType" : "height",
"relation" : [ {
"guid" : "c28c2569-7a0d-5d1a-94d4-4907dc91f20a",
"template" : "Property",
"language" : null,
"fields" : {
"name" : 112,
"options" : {
"guid" : null,
"template" : "OptionsBrickContainer",
"language" : null,
"fields" : {
"propertyMDMData" : {
"guid" : null,
"template" : "PropertyMDMDataBrick",
"language" : null,
"fields" : {
"code" : 10006
}
},
"propertySize" : {
"guid" : null,
"template" : "PropertySizeBrick",
"language" : null,
"fields" : {
"physicalQuantity" : "height",
"numericValue" : {
"value" : 112.0,
"measureUnit" : {
"unitName" : "sm",
"unitCode" : "004"
}
}
}
}
}
}
}
} ]
}
}
}
},
"sku" : 1563340922,
"statusCode" : "draft"
}
},
"stateName" : "Updated"
}
В данных от этой системы передаётся полная информация о товаре, его составляющих, размерах, свойствах упаковки. Такой формат позволяет потребителям самостоятельно определять способы хранения соответствующих сведений для каждого свойства товара, а системе — генерировать одно сообщение на единицу товара, не выгружая в отдельные потоки связанные данные. Мне же требовалось сформировать из этих сообщений плоскую таблицу по заданному SourceToTarget:
Поле |
Имя поля |
Тип данных |
data.object.guid |
guid |
uniqueidentifier |
data.object.fields.manufactureCountry.guid |
manufactureCountryGuid |
uniqueidentifier |
data.object.fields.manufactureCountry.fields.name |
manufactureCountryName |
text |
data.object.fields.itemGroup.guid |
itemGroupGuid |
uniqueidentifier |
data.object.fields.parentObject.guid |
parentGuid |
uniqueidentifier |
data.object.fields.sourceProduction.[0] |
sourceProductionValue |
bool |
data.object.fields.measureUnit.unitName |
unitName |
text |
data.object.fields.measureUnit.unitCode |
unitCode |
int |
data.object.fields.commerce.fields.product.fields.tax |
tax |
int |
data.object.fields.series.guid |
seriesGuid |
uniqueidentifier |
data.object.fields.series.fields.name |
seriesName |
text |
data.object.fields.series.fields.priceSegment.guid |
priceSegmentGuid |
uniqueidentifier |
data.object.fields.series.fields.priceSegment.fields.name |
priceSegmentName |
text |
data.object.fields.name |
name |
text |
data.object.fields.dimensionCharacteristics |
dimensionCharacteristics |
int |
data.object.fields.sku |
sku |
text |
data.object.fields.isOutOfCollection |
isOutOfCollection |
bool |
data.object.fields.isMatrix |
isMatrix |
bool |
data.object.fields.kit |
kit |
int |
data.object.fields.numberOfParts |
numberOfParts |
int |
data.object.fields.ean13 |
ean13 |
text |
data.object.fields.code |
code |
text |
data.object.fields.statusCode |
statusCode |
text |
data.stateName |
stateName |
text |
metadata.ts |
ts |
datetime2 |
Выход простой: написать спецификацию Jolt для текущего случая, сделать ответвление на основании имени топика, а после трансформации вернуть поток в стандартный маршрут. Разработав и отладив спецификацию, применил её в JoltTransformRecord.
Спецификация, решающая поставленную задачу
[
{
"operation": "shift",
"spec": {
"metadata": {
"*": "&"
},
"data": {
"object": {
"guid": "&",
"fields": {
"manufactureCountry": {
"guid": "manufactureCountryGuid",
"fields": {
"name": "manufactureCountryName"
}
},
"itemGroup": {
"guid": "itemGroupGuid"
},
"parentObject": {
"guid": "parentGuid"
},
"sourceProduction": {
"0": "SourceProductionValue"
},
"measureUnit": {
"*": "&"
},
"commerce": {
"fields": {
"product": {
"fields": {
"tax": "&"
}
}
}
},
"series": {
"guid": "seriesGuid",
"fields": {
"name": "seriesName",
"priceSegment": {
"guid": "priceSegmentGuid",
"fields": {
"name": "priceSegmentName"
}
}
}
},
"name": "&",
"dimensionCharacteristics": "&",
"sku": "&",
"isOutOfCollection": "&",
"isMatrix": "&",
"kit": "&",
"numberOfParts": "&",
"ean13": "&",
"code": "&",
"statusCode": "&"
}
},
"stateName": "StateName"
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"SourceProductionValue": "=toInteger",
"unitCode": "=toInteger",
"tax": "=toInteger",
"dimensionCharacteristics": "=toInteger",
"isOutOfCollection": "=toInteger",
"isMatrix": "=toInteger",
"kit": "=toInteger",
"numberOfParts": "=toInteger",
"sku": "=toString"
}
},
{
"operation": "shift",
"spec": {
"isOutOfCollection": {
"0": {
"#false": "isOutOfCollection"
},
"1": {
"#true": "isOutOfCollection"
}
},
"isMatrix": {
"0": {
"#false": "isMatrix"
},
"1": {
"#true": "isMatrix"
}
},
"kit": {
"0": {
"#false": "kit"
},
"1": {
"#true": "kit"
}
},
"*": "&"
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"isOutOfCollection": "=toBoolean",
"isMatrix": "=toBoolean",
"kit": "=toBoolean"
}
}
]
Пояснение по блокам спецификации JOLT
Первый блок "shift" сдвигает данные на один уровень. В нем все просто - взять указанное значение и поместить его в указанное поле, но на другом уровне. Например, поле "data.object.guid" перемещается в "guid":
{
"operation": "shift",
"spec": {
"metadata": {
"*": "&"
},
"data": {
"object": {
"guid": "&"
Следующий блок "modify-overwrite-beta" преобразует данные к требуемым типам, так как в исходных данных тип может не соответствовать целевому. Пример такого преобразования для поля "isMatrix". Несмотря на то, что в исходном виде это число 0 или 1, в результате трасформации поле приводилось к строке.
"isMatrix": "=toInteger"
Цель преобразования некоторых полей к численным типам раскрывается в следующем блоке трансформации "shift". Не многие знают, что с помощью это операции можно не только перенести значения из одного поля в другое, но также делать это с условиями:
"isMatrix": {
"0": {
"#false": "isMatrix"
},
"1": {
"#true": "isMatrix"
}
Исходя из значения поля "isMatrix" значение в нем заменяется на строку "true" или "false". И самым последним блоком строковое представление преобразуется к логическому типу:
{
"operation": "modify-overwrite-beta",
"spec": {
"isOutOfCollection": "=toBoolean",
"isMatrix": "=toBoolean",
"kit": "=toBoolean"
}
}
В результате получено полное соответствие S2T.
Однако в процессе применения трансформации формировалась ошибка:
JoltTransformRecord[id=018f1086-7c30-1015-2569-a67ef2af5b1d] Unable to transform FlowFile[filename=0c119857-beac-4f4f-96c7-406b71c2c9fc] due to org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value of class [Ljava.lang.Object; because the type is not supported: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value of class [Ljava.lang.Object; because the type is not supported
При получении ошибки я провёл ряд манипуляций, которые делаю всегда: разбить FlowFile на единичные записи, упростить трансформацию, преобразовать форматы. Результат был отрицательным, так как процессор JoltTransformRecord постоянно выдавал ошибку. Упрощая структуру сообщения, я выяснил, что ошибка возникает потому, что JoltTransformRecord не может корректно обработать массив, в котором присутствуют вариативные типы данных, определяемые типом map. Также не помог вариант с предварительным формированием целевой схемы данных и указанием её соответствующему RecordSetWriter. Перевод типа данных из Avro в JSON тоже не дал результата, так как тип map и массивы записей никуда не ушли и стали генерироваться на основании самих данных.
Avro-схема данных
{
"type": "record",
"name": "Mdm_Record",
"namespace": "any.org",
"fields": [
{
"name": "metadata",
"type": {
"type": "record",
"name": "MessageInfo",
"namespace": "any.org",
"fields": [
{
"name": "ts",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
},
{
"name": "data",
"type": {
"type": "record",
"name": "MDM_Item",
"fields": [
{
"name": "object",
"type": {
"type": "record",
"name": "Node",
"fields": [
{
"name": "guid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "template",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "language",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "fields",
"type": {
"type": "map",
"values": [
"Node",
{
"type": "record",
"name": "Barcode",
"fields": [
{
"name": "provider",
"type": {
"type": "record",
"name": "Provider",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "city",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "taxIdNumber",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "taxRegistrationReasonCode",
"type": [
"null",
"string"
],
"default": null
}
]
}
},
{
"name": "barcode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "supplierArticle",
"type": [
"null",
"string"
],
"default": null
}
]
},
{
"type": "record",
"name": "MediaResource",
"fields": [
{
"name": "link",
"type": "string",
},
{
"name": "filename",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "mimetype",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "fileSize",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "format",
"type": [
"null",
"string"
],
"default": null
}
]
},
{
"type": "record",
"name": "SofaSpecification",
"fields": [
{
"name": "seats",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "bedLength",
"type": {
"type": "record",
"name": "QuantityValue",
"fields": [
{
"name": "value",
"type": "float",
},
{
"name": "measureUnit",
"type": [
"null",
{
"type": "record",
"name": "MeasureUnit",
"fields": [
{
"name": "unitName",
"type": "string",
},
{
"name": "unitCode",
"type": "string",
}
]
}
],
"default": null
}
]
}
},
{
"name": "bedWidth",
"type": "QuantityValue",
},
{
"name": "backHeightFolded",
"type": "QuantityValue",
},
{
"name": "backHeightFoldedOut",
"type": "QuantityValue",
},
{
"name": "fullLength",
"type": "QuantityValue",
}
]
},
{
"type": "record",
"name": "MattressSpecification",
"fields": [
{
"name": "bedLength",
"type": "QuantityValue",
},
{
"name": "bedWidth",
"type": "QuantityValue",
},
{
"name": "height",
"type": "QuantityValue",
},
{
"name": "maximumBedLoad",
"type": "QuantityValue",
}
]
},
"QuantityValue",
"MeasureUnit",
{
"type": "array",
"items": [
"Node",
"Barcode",
{
"type": "record",
"name": "ItemPack",
"fields": [
{
"name": "packageName",
"type": "string",
},
{
"name": "package",
"type": [
"null",
{
"type": "record",
"name": "Pack",
"fields": [
{
"name": "packageGuid",
"type": "string",
},
{
"name": "packageType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "packaging",
"type": {
"type": "map",
"values": [
{
"type": "record",
"name": "Box",
"fields": [
{
"name": "length",
"type": "QuantityValue",
},
{
"name": "width",
"type": "QuantityValue",
},
{
"name": "height",
"type": "QuantityValue",
}
]
},
{
"type": "record",
"name": "SoftPackaging",
"fields": [
{
"name": "twist",
"type": "boolean",
}
]
}
]
},
}
]
}
],
"default": null
},
{
"name": "length",
"type": "QuantityValue",
},
{
"name": "width",
"type": "QuantityValue",
},
{
"name": "height",
"type": "QuantityValue",
},
{
"name": "volume",
"type": "QuantityValue",
},
{
"name": "weightNetto",
"type": "QuantityValue",
},
{
"name": "weightGross",
"type": "QuantityValue",
},
{
"name": "seatsCount",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "packagedUnitsCount",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "setSeatsCount",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "priority",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "measureUnit",
"type": [
"null",
"MeasureUnit"
],
"default": null
},
{
"name": "mainLogisticsPackaging",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "barcode",
"type": [
"null",
"string"
],
"default": null
}
]
},
"MediaResource",
"string",
"int",
"boolean"
]
},
{
"type": "map",
"values": [
"string",
"int",
"boolean"
]
},
"string",
"int",
"boolean"
]
}
}
]
}
},
{
"name": "stateName",
"type": "string"
}
]
}
}
]
}
Так как выбранный ранее вариант с трансформацией Jolt не работал, я решил применить проверенное средство для крайних случаев — скрипт. Есть хороший процессор — ScriptedTransformRecord, позволяющий выполнять обработку по одной записи, получая объект типа Record.
Скрипт получился довольно большим по причине наличия типов map, а также возможности значения «null» в требуемых полях. Так как существует вероятность модификации формата данных либо корректировки S2T, то в будущем потребуется изменять скрипт, что при его большом объёме влечёт увеличение сложности для разработчика. Так что я решил отказаться от скрипта и вернуться к разработанной и отлаженной спецификации, ведь для JSON она работает корректно, ошибка возникает только при работе с записью, когда применяется схема данных.
То есть в этом случае я решил отказаться от обработки записи и перейти к обработке контента целиком, что приводит к увеличению количества FlowFile и, соответственно, увеличению задействования оперативной памяти.
В NiFi можно применить спецификацию Jolt к записям c помощью JoltTransformRecord или воспользоваться процессором JoltTransformJSON, который ожидает на входе JSON и преобразовывает его не как запись, а как единый JSON-файл. Так как происходит обработка всего контента целиком, то для сокращения накладных расходов лучше подавать на вход единичный объект JSON, а не массив. Поэтому предварительно потребовалась разбивка пришедшего FlowFile на фрагменты, где каждый содержал бы один JSON. Это позволило в дальнейшем выполнить трансформацию для единичного объекта быстро, но повлекло генерацию большого количества FlowFile. Для этого применил SplitRecord, где Reader читал Avro-формат, а RecordSetWriter был настроен для записи одного JSON-объекта.
Для сокращения дальнейших накладных расходов данные объединяются с помощью MergeRecord. Перед слиянием выполнена конвертация в Avro с заданной целевой схемой, соответствующей S2T.
Далее данные вернулись в основной поток и были успешно внесены в базу данных. Таким образом, из-за сложной структуры данных пришлось изменить основной паттерн работы с данными в NiFi — работать с записями. В итоге, допустив проигрыш по ресурсу на небольшом участке потока обработки данных, в целом я выиграл, оставаясь в ограниченном круге инструментов (только Jolt в трансформациях) и вернув данные в общий поток.
По скорости обработки ScriptedTransformRecord был быстрее из-за обработки одного файла с множеством записей. Однако сложность скрипта в процессе модернизации или поддержки снижает привлекательность такого решения. Анализ же спецификации займёт меньше времени, тем самым позволив быстрее вынести корректировки в случае изменения структуры данных.
В ходе дальнейшей работы стало ясно, что ответвление оправдало себя многократно. Так как исходный объект является записью о товаре и включает совокупность большого количества сущностей и связей с другими сущностями, то из одного сообщения понадобилось формировать более десятка стейджинговых таблиц. И решение было простое: раз уже есть разбивка на отдельные JSON, можно разработать отдельные спецификации Jolt для каждого случая, сделать итоговые схемы данных и все преобразования реализовать одинаковым шаблоном.
Что я хотел сказать, описывая этот случай. В NiFi есть много способов достичь требуемого результата. Иногда стоит попробовать несколько вариантов, но выбирать тот, который более прост в понимании разработчиком и требует меньше усилий при поддержке или модификации.
В заключение сформирую ряд рекомендаций для разработчиков, которые, на мой взгляд, будут полезны:
Вся обработка должна выполняться над записями. Это позволит применять бинарные форматы и минимизировать затраты на преобразования.
Переходить к обработке контента целиком только в случае, когда ресурсоёмкость работы с записями (тут и затраты на поддержку, и развитие потока, и сама обработка) превышает выбранную вами границу.
Если потребовалось перейти к обработке контента целиком, то необходимо вернуться к записям как можно быстрее, при этом обеспечить удаление всех ссылок на единичные FlowFile для очистки репозитория контента и оперативной памяти.
Формировать «универсальные» потоки, где параметры процессоров задаются на основе атрибутов. Это позволит применять один и тот же набор процессоров для разных источников, минимизировав количество запускаемых экземпляров процессоров и снизив нагрузку на планировщик.
Полезные ссылки: