
Добрый день. Меня зовут Иван Клименко, и я архитектор департамента поддержки продаж компании Arenadata. В основном занимаюсь продуктом Arenadata Streaming (ADS) — это масштабируемая отказоустойчивая система для потоковой обработки данных в режиме реального времени, адаптированная для корпоративного использования и построенная на базе Apache Kafka и Apache NiFi. В продукт входит замечательный сервис Arenadata Streaming NiFi, который является low-code средством построения интеграционных потоков с возможностью масштабирования.
Сегодня хочу показать на одном из практических случаев, что не всегда в NiFi удаётся следовать шаблонному подходу record-oriented в разработке потоков обработки и трансформации данных.
Как известно, в NiFi применяется так называемый record-oriented-подход. Что это значит? Каждый контент FlowFile представляет собой не один элемент, а множество одинаковых по структуре записей. И работать с ним можно как с массивом записей, проводя необходимые трансформации и манипуляции, например выборки, обогащения либо изменения содержимого полей. В этом случае если запись изменяется, то контент FlowFile заново записывается на диск, а в атрибутах меняется системная ссылка на файл контента. Для чтения и записи элементов применяются специальные сервисы: RecordReader и RecordSetWriter, — которые, основываясь на схемах данных, позволяют процессорам обработки оперировать записями, не вдаваясь в детали их хранения. Такой подход позволяет сократить накладные расходы, так как хранить и обращаться к единому множеству записей в одном файле с одним набором метаданных выгоднее, чем хранить каждый файл с метаданными отдельно. Стандарты обработки данных тоже сместили фокус с индивидуальной обработки на работу с записями. И одной из моих рекомендаций всегда является применение обработки записей с помощью предназначенных для этого процессоров: UpdateRecord, JoltTransformRecord, QueryRecord и так далее. Это быстрее за счёт обращения к одному файлу, а оперативная память требуется только на один набор атрибутов, а не на множество.
Но что делать, если в вашем потоке обработки всё построено на записях, но от одного источника приходят данные, которые выпадают из общей парадигмы? Я расскажу об одном случае из моей практики, когда при решении задачи наполнения хранилища пришлось провести нестандартную обработку.
Для понимания задачи опишу информационную среду, в которой я разрабатывал интеграционные процессы. Есть компания, которая производит и продаёт товары, допустим мебель. Она использует различные бизнес-сервисы, позволяющие автоматизировать задачи по обороту товаров, логистике, складскому учёту, кадровым задачам, продажам и производству. Данные для построения отчётности компания собирает в аналитическом хранилище. А сама аналитика информации нужна для комплексной оценки эффективности бизнеса, текущей работы компании и прогнозирования во всех сферах деятельности. Источниками данных для хранилища являются различные ERP-, CRM-, PIM-, MDM-системы, веб-приложения и приложения обработки данных компании. В целях оптимизации информационного обмена прямой доступ между системами заменён на обмен сообщениями через единую шину данных. Чтобы сократить объём данных, в компании введено правило: в топике Kafkaнаходятся сообщения, сформированные по одной схеме, и сами сообщения преобразованы в Avro по стандарту Confluent. Схемы сохранены в ADS. Schema Registry. Обобщённая структура информационной среды представлена на рисунке.

Также в компании принят стандарт передачи сообщения в виде блока метаданных и данных. Обязательным элементом метаданных является время генерации сообщения, остальные поля оставлены на усмотрение генерирующий системы. Таким образом, данные представляют собой структуру, разделённую на два уровня: информационный блок и непосредственно структуру от источника, например, в таком виде:
Пример передаваемых данных
{
  "metadata" : {
    "ts" : "2020-01-01 00:00:00.0000"
  },
  "data" : {
    "someFields1" : "some value", 
    "someFieeld2" : 0,
    "someFieeld2 : true
  }
}ETL-процесс наполнения хранилища выглядит максимально просто: извлечение данных из требуемых топиков, необходимые трансформации по приведению[АД1] к структуре, пригодной для внесения в таблицу базы данных, сопоставление исходного топика целевой таблице в хранилище и внесение данных в хранилище.
Следующим этапом нужно подготовить данные для внесения в таблицу. Для этого требуется преобразовать полученную иерархическую структуру в плоскую таблицу, сдвинув поля из вложенности на уровень вверх, убрать ненужные теги metadata и data. Дополнительно понадобилось добавить в структуру время формирования записи из метаданных. Стейджинг формируется таком образом, что одному топику, а следовательно, одной структуре, соответствует одна целевая таблица, при этом имя таблицы соответствует имени топика. Дальнейшая обработка данных вынесена в процедуры расчёта хранилища и не завязана на сам ETL-процесс.
При получении данных из Kafka с помощью процессора ConsumeKafkaRecord флаг "Separate By Key" выставлен в значение "false", что позволяет формировать один FlowFile для нескольких сообщений. Так как нужно просто «сдвинуть» поля на один уровень, в качестве инструмента сразу напрашивается Jolt. Для упрощения структуры данных можно применить простую Jolt-трансформацию, которая перебирает все поля второго уровня и сдвигает их на первый уровень:
Трансформация, преобразующая двухуровневую иерархию в плоскую струтуру
[
  {
    "operation": "shift",
    "spec": {
      "*": { "*": "&" }
    }
  }
]
Трансформация позволяет работать с одной записью, и её можно применить в процессоре JoltTransformRecord. Таким образом, получается универсальный процесс, извлекающий данные из Kafka, трансформирующий их в плоскую структуру и передающий в целевую базу данных.

Конечно, представленный поток не готов к выводу в продуктовую среду, это только его ядро, которое необходимо дополнить обработкой ошибок, сформировать повторы, добавить оповещение. Тем не менее в таком виде уже получаем универсальный поток обработки, позволяющий загружать данные из любого топика в соответствующую таблицу целевой базы. Если внести список топиков в параметры контекста и управлять ими через API, то сможем автоматизировано управлять источниками.
Бизнес-системы не являются статичными, и список источников данных для аналитических хранилищ постоянно изменяется. В один момент появилась PIM/MDM-система, генерирующая данные не в простом формате, а с гораздо более сложной иерархией.
Сложная структура данных, генерируемая PIM/MDM-системой (очень большой JSON)
{
  "metadata" : {
    "ts" : "2024-05-14 22:15:68.5896"
  },
  "data" : {
    "object" : {
      "guid" : "04551935-d520-5785-b257-c8ae8240711c",
      "template" : "Item",
      "language" : null,
      "fields" : {
        "itemGroup" : {
          "guid" : "497b6e51-4c1d-5ce0-be02-b8f37ba35c26",
          "template" : "ItemGroup",
          "language" : null,
          "fields" : {
            "name" : "GroupNanme",
            "parentObject" : {
              "guid" : "32d38329-1868-5f51-bb30-f824258539fb",
              "template" : "ItemGroup",
              "language" : null,
              "fields" : {
                "name" : "parent name"
              }
            },
            "specification" : {
              "guid" : null,
              "template" : "SpecificationBrickContainer",
              "language" : null,
              "fields" : {
                "internalGroupSpecification" : {
                  "guid" : null,
                  "template" : "InternalGroupSpecificationBrick",
                  "language" : null,
                  "fields" : {
                    "volumeRatio" : "1.3",
                    "groupAttribute" : {
                      "guid" : "84281670-f476-5f3f-8b31-382f02e13e83",
                      "template" : "ItemGroup",
                      "language" : null,
                      "fields" : {
                        "name" : "group name",
                        "parentObject" : {
                          "guid" : "1ba54e86-bc86-59ac-93e9-34fa4a68abe3",
                          "template" : "Folder",
                          "language" : null,
                          "fields" : {
                            "name" : "another name"
                          }
                        }
                      }
                    },
                    "codeGroup" : "5561010305"
                  }
                }
              }
            }
          }
        },
        "additionalInformation" : {
          "guid" : null,
          "template" : "AdditionalInformationBrickContainer",
          "language" : null,
          "fields" : {
            "additionalInformation" : [ "AdditionalInformationBrick" ]
          }
        },
        "manufactureCountry" : {
          "guid" : "f64ab0a6-cff1-5dcd-9f1c-cc43f1c58d03",
          "template" : "Property",
          "language" : null,
          "fields" : {
            "name" : "US"
          }
        },
        "code" : "5896dkwskeo_kedoed896",
        "parentObject" : {
          "guid" : "ce5a885b-8170-5bbe-80c1-c0d2d41d0cad",
          "template" : "Item",
          "language" : null,
          "fields" : {
            "name" : "parent name",
            "parentObject" : {
              "guid" : "83c7b0c5-3d1c-50ff-a9f0-0b0fc6e47cbd",
              "template" : "ItemTemplate",
              "language" : null,
              "fields" : {
                "name" : "object source name"
              }
            },
            "statusCode" : "draft"
          }
        },
        "ean13" : "462711234164",
        "measureUnit" : {
          "unitName" : "item",
          "unitCode" : "796"
        },
        "main" : {
          "guid" : null,
          "template" : "MainBrickContainer",
          "language" : null,
          "fields" : {
            "localizedDescription" : [ "LocalizedDescriptionBrick" ]
          }
        },
        "media" : {
          "guid" : null,
          "template" : "MediaBrickContainer",
          "language" : null,
          "fields" : {
            "media" : [ "MediaBrick" ]
          }
        },
        "commerce" : {
          "guid" : null,
          "template" : "CommerceBrickContainer",
          "language" : null,
          "fields" : {
            "product" : {
              "guid" : null,
              "template" : "ProductBrick",
              "language" : null,
              "fields" : {
                "ean13" : "4627112341690"
              }
            }
          }
        },
        "pack" : [ {
          "packageName" : "coverage",
          "package" : null,
          "length" : {
            "value" : 140.0,
            "measureUnit" : {
              "unitName" : "sm",
              "unitCode" : "004"
            }
          },
          "width" : {
            "value" : 200.0,
            "measureUnit" : {
              "unitName" : "sm",
              "unitCode" : "004"
            }
          },
          "height" : {
            "value" : 10.0,
            "measureUnit" : {
              "unitName" : "sm",
              "unitCode" : "004"
            }
          },
          "volume" : {
            "value" : 0.28,
            "measureUnit" : {
              "unitName" : "m3",
              "unitCode" : "113"
            }
          },
          "weightNetto" : {
            "value" : 35.9,
            "measureUnit" : {
              "unitName" : "kg",
              "unitCode" : "166"
            }
          },
          "weightGross" : {
            "value" : 35.9,
            "measureUnit" : {
              "unitName" : "kg",
              "unitCode" : "166"
            }
          },
          "seatsCount" : null,
          "packagedUnitsCount" : null,
          "setSeatsCount" : null,
          "priority" : null,
          "measureUnit" : null,
          "mainLogisticsPackaging" : null,
          "barcode" : null
        } ],
        "isMatrix" : 1,
        "series" : {
          "guid" : "f68d1bc3-2600-5b40-b9b1-957d039cdd6b",
          "template" : "tmp",
          "language" : null,
          "fields" : {
            "name" : "seria name",
            "priceSegment" : {
              "guid" : "364044ec-0f96-5a5f-9028-9dc25f752e5e",
              "template" : "Property",
              "language" : null,
              "fields" : {
                "name" : "middle"
              }
            },
            "parentObject" : {
              "guid" : "a497557f-4eda-5f16-807a-4b71eb5c1b74",
              "template" : "Seria",
              "language" : null,
              "fields" : {
                "name" : "GENERAL Seria name"
              }
            }
          }
        },
        "name" : "Common Name",
        "options" : {
          "guid" : null,
          "template" : "OptionsCollection",
          "language" : null,
          "fields" : {
            "length" : {
              "guid" : null,
              "template" : "OptionsCollectionItem",
              "language" : null,
              "fields" : {
                "propertyType" : "length",
                "relation" : [ {
                  "guid" : "28df4e5e-5201-519a-a3a5-57d7e1838361",
                  "template" : "Property",
                  "language" : null,
                  "fields" : {
                    "name" : 200,
                    "options" : {
                      "guid" : null,
                      "template" : "OptionsBrickContainer",
                      "language" : null,
                      "fields" : {
                        "propertyMDMData" : {
                          "guid" : null,
                          "template" : "PropertyMDMDataBrick",
                          "language" : null,
                          "fields" : {
                            "code" : 10004
                          }
                        },
                        "propertySize" : {
                          "guid" : null,
                          "template" : "PropertySizeBrick",
                          "language" : null,
                          "fields" : {
                            "physicalQuantity" : "length",
                            "numericValue" : {
                              "value" : 200.0,
                              "measureUnit" : {
                                "unitName" : "sm",
                                "unitCode" : "004"
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                } ]
              }
            },
            "width" : {
              "guid" : null,
              "template" : "OptionsCollectionItem",
              "language" : null,
              "fields" : {
                "propertyType" : "width",
                "relation" : [ {
                  "guid" : "59c08ec1-d0ff-55c4-99a4-6616f23f1469",
                  "template" : "Property",
                  "language" : null,
                  "fields" : {
                    "name" : 140,
                    "options" : {
                      "guid" : null,
                      "template" : "OptionsBrickContainer",
                      "language" : null,
                      "fields" : {
                        "propertyMDMData" : {
                          "guid" : null,
                          "template" : "PropertyMDMDataBrick",
                          "language" : null,
                          "fields" : {
                            "code" : 10005
                          }
                        },
                        "propertySize" : {
                          "guid" : null,
                          "template" : "PropertySizeBrick",
                          "language" : null,
                          "fields" : {
                            "physicalQuantity" : "width",
                            "numericValue" : {
                              "value" : 140.0,
                              "measureUnit" : {
                                "unitName" : "sm",
                                "unitCode" : "004"
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                } ]
              }
            },
            "height" : {
              "guid" : null,
              "template" : "OptionsCollectionItem",
              "language" : null,
              "fields" : {
                "propertyType" : "height",
                "relation" : [ {
                  "guid" : "c28c2569-7a0d-5d1a-94d4-4907dc91f20a",
                  "template" : "Property",
                  "language" : null,
                  "fields" : {
                    "name" : 112,
                    "options" : {
                      "guid" : null,
                      "template" : "OptionsBrickContainer",
                      "language" : null,
                      "fields" : {
                        "propertyMDMData" : {
                          "guid" : null,
                          "template" : "PropertyMDMDataBrick",
                          "language" : null,
                          "fields" : {
                            "code" : 10006
                          }
                        },
                        "propertySize" : {
                          "guid" : null,
                          "template" : "PropertySizeBrick",
                          "language" : null,
                          "fields" : {
                            "physicalQuantity" : "height",
                            "numericValue" : {
                              "value" : 112.0,
                              "measureUnit" : {
                                "unitName" : "sm",
                                "unitCode" : "004"
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                } ]
              }
            }
          }
        },
        "sku" : 1563340922,
        "statusCode" : "draft"
      }
    },
    "stateName" : "Updated"
  }
В данных от этой системы передаётся полная информация о товаре, его составляющих, размерах, свойствах упаковки. Такой формат позволяет потребителям самостоятельно определять способы хранения соответствующих сведений для каждого свойства товара, а системе — генерировать одно сообщение на единицу товара, не выгружая в отдельные потоки связанные данные. Мне же требовалось сформировать из этих сообщений плоскую таблицу по заданному SourceToTarget:
| Поле | Имя поля | Тип данных | 
| data.object.guid | guid | uniqueidentifier | 
| data.object.fields.manufactureCountry.guid | manufactureCountryGuid | uniqueidentifier | 
| data.object.fields.manufactureCountry.fields.name | manufactureCountryName | text | 
| data.object.fields.itemGroup.guid | itemGroupGuid | uniqueidentifier | 
| data.object.fields.parentObject.guid | parentGuid | uniqueidentifier | 
| data.object.fields.sourceProduction.[0] | sourceProductionValue | bool | 
| data.object.fields.measureUnit.unitName | unitName | text | 
| data.object.fields.measureUnit.unitCode | unitCode | int | 
| data.object.fields.commerce.fields.product.fields.tax | tax | int | 
| data.object.fields.series.guid | seriesGuid | uniqueidentifier | 
| data.object.fields.series.fields.name | seriesName | text | 
| data.object.fields.series.fields.priceSegment.guid | priceSegmentGuid | uniqueidentifier | 
| data.object.fields.series.fields.priceSegment.fields.name | priceSegmentName | text | 
| data.object.fields.name | name | text | 
| data.object.fields.dimensionCharacteristics | dimensionCharacteristics | int | 
| data.object.fields.sku | sku | text | 
| data.object.fields.isOutOfCollection | isOutOfCollection | bool | 
| data.object.fields.isMatrix | isMatrix | bool | 
| data.object.fields.kit | kit | int | 
| data.object.fields.numberOfParts | numberOfParts | int | 
| data.object.fields.ean13 | ean13 | text | 
| data.object.fields.code | code | text | 
| data.object.fields.statusCode | statusCode | text | 
| data.stateName | stateName | text | 
| metadata.ts | ts | datetime2 | 
Выход простой: написать спецификацию Jolt для текущего случая, сделать ответвление на основании имени топика, а после трансформации вернуть поток в стандартный маршрут. Разработав и отладив спецификацию, применил её в JoltTransformRecord.
Спецификация, решающая поставленную задачу
[
  {
    "operation": "shift",
    "spec": {
      "metadata": {
        "*": "&"
      },
      "data": {
        "object": {
          "guid": "&",
          "fields": {
            "manufactureCountry": {
              "guid": "manufactureCountryGuid",
              "fields": {
                "name": "manufactureCountryName"
              }
            },
            "itemGroup": {
              "guid": "itemGroupGuid"
            },
            "parentObject": {
              "guid": "parentGuid"
            },
            "sourceProduction": {
              "0": "SourceProductionValue"
            },
            "measureUnit": {
              "*": "&"
            },
            "commerce": {
              "fields": {
                "product": {
                  "fields": {
                    "tax": "&"
                  }
                }
              }
            },
            "series": {
              "guid": "seriesGuid",
              "fields": {
                "name": "seriesName",
                "priceSegment": {
                  "guid": "priceSegmentGuid",
                  "fields": {
                    "name": "priceSegmentName"
                  }
                }
              }
            },
            "name": "&",
            "dimensionCharacteristics": "&",
            "sku": "&",
            "isOutOfCollection": "&",
            "isMatrix": "&",
            "kit": "&",
            "numberOfParts": "&",
            "ean13": "&",
            "code": "&",
            "statusCode": "&"
          }
        },
        "stateName": "StateName"
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "SourceProductionValue": "=toInteger",
      "unitCode": "=toInteger",
      "tax": "=toInteger",
      "dimensionCharacteristics": "=toInteger",
      "isOutOfCollection": "=toInteger",
      "isMatrix": "=toInteger",
      "kit": "=toInteger",
      "numberOfParts": "=toInteger",
      "sku": "=toString"
    }
  },
  {
    "operation": "shift",
    "spec": {
      "isOutOfCollection": {
        "0": {
          "#false": "isOutOfCollection"
        },
        "1": {
          "#true": "isOutOfCollection"
        }
      },
      "isMatrix": {
        "0": {
          "#false": "isMatrix"
        },
        "1": {
          "#true": "isMatrix"
        }
      },
      "kit": {
        "0": {
          "#false": "kit"
        },
        "1": {
          "#true": "kit"
        }
      },
      "*": "&"
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "isOutOfCollection": "=toBoolean",
      "isMatrix": "=toBoolean",
      "kit": "=toBoolean"
    }
  }
]
Пояснение по блокам спецификации JOLT
Первый блок "shift" сдвигает данные на один уровень. В нем все просто - взять указанное значение и поместить его в указанное поле, но на другом уровне. Например, поле "data.object.guid" перемещается в "guid":
{
    "operation": "shift",
    "spec": {
      "metadata": {
        "*": "&"
      },
      "data": {
        "object": {
          "guid": "&"
Следующий блок "modify-overwrite-beta" преобразует данные к требуемым типам, так как в исходных данных тип может не соответствовать целевому. Пример такого преобразования для поля "isMatrix". Несмотря на то, что в исходном виде это число 0 или 1, в результате трасформации поле приводилось к строке.
"isMatrix": "=toInteger"Цель преобразования некоторых полей к численным типам раскрывается в следующем блоке трансформации "shift". Не многие знают, что с помощью это операции можно не только перенести значения из одного поля в другое, но также делать это с условиями:
"isMatrix": {
        "0": {
          "#false": "isMatrix"
        },
        "1": {
          "#true": "isMatrix"
        }
Исходя из значения поля "isMatrix" значение в нем заменяется на строку "true" или "false". И самым последним блоком строковое представление преобразуется к логическому типу:
{
    "operation": "modify-overwrite-beta",
    "spec": {
      "isOutOfCollection": "=toBoolean",
      "isMatrix": "=toBoolean",
      "kit": "=toBoolean"
    }
  }
В результате получено полное соответствие S2T.
Однако в процессе применения трансформации формировалась ошибка:
JoltTransformRecord[id=018f1086-7c30-1015-2569-a67ef2af5b1d] Unable to transform FlowFile[filename=0c119857-beac-4f4f-96c7-406b71c2c9fc] due to org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value of class [Ljava.lang.Object; because the type is not supported: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value of class [Ljava.lang.Object; because the type is not supported
При получении ошибки я провёл ряд манипуляций, которые делаю всегда: разбить FlowFile на единичные записи, упростить трансформацию, преобразовать форматы. Результат был отрицательным, так как процессор JoltTransformRecord постоянно выдавал ошибку. Упрощая структуру сообщения, я выяснил, что ошибка возникает потому, что JoltTransformRecord не может корректно обработать массив, в котором присутствуют вариативные типы данных, определяемые типом map. Также не помог вариант с предварительным формированием целевой схемы данных и указанием её соответствующему RecordSetWriter. Перевод типа данных из Avro в JSON тоже не дал результата, так как тип map и массивы записей никуда не ушли и стали генерироваться на основании самих данных.
Avro-схема данных
{
  "type": "record",
  "name": "Mdm_Record",
  "namespace": "any.org",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "MessageInfo",
        "namespace": "any.org",
        "fields": [
          {
            "name": "ts",
            "type": {
              "type": "long",
              "logicalType": "timestamp-millis"
            }            
          }
        ]
      }
    },
    {
      "name": "data",
      "type": {
        "type": "record",
        "name": "MDM_Item",
        "fields": [
          {
            "name": "object",
            "type": {
              "type": "record",
              "name": "Node",
              "fields": [
                {
                  "name": "guid",
                  "type": [
                    "null",
                    "string"
                  ],               
                  "default": null
                },
                {
                  "name": "template",
                  "type": [
                    "null",
                    "string"
                  ],                 
                  "default": null
                },
                {
                  "name": "language",
                  "type": [
                    "null",
                    "string"
                  ],                 
                  "default": null
                },
                {
                  "name": "fields",
                  "type": {
                    "type": "map",
                    "values": [
                      "Node",
                      {
                        "type": "record",
                        "name": "Barcode",
                        "fields": [
                          {
                            "name": "provider",
                            "type": {
                              "type": "record",
                              "name": "Provider",
                              "fields": [
                                {
                                  "name": "name",
                                  "type": "string"                                 
                                },
                                {
                                  "name": "city",
                                  "type": [
                                    "null",
                                    "string"
                                  ],                                 
                                  "default": null
                                },
                                {
                                  "name": "taxIdNumber",
                                  "type": [
                                    "null",
                                    "string"
                                  ],
                                  "default": null
                                },
                                {
                                  "name": "taxRegistrationReasonCode",
                                  "type": [
                                    "null",
                                    "string"
                                  ],                                 
                                  "default": null
                                }
                              ]
                            }
                           
                          },
                          {
                            "name": "barcode",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "supplierArticle",
                            "type": [
                              "null",
                              "string"
                            ],
                           "default": null
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "MediaResource",
                        "fields": [
                          {
                            "name": "link",
                            "type": "string",                           
                          },
                          {
                            "name": "filename",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "mimetype",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "fileSize",
                            "type": [
                              "null",
                              "int"
                            ],
                           
                            "default": null
                          },
                          {
                            "name": "format",
                            "type": [
                              "null",
                              "string"
                            ],                            
                            "default": null
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "SofaSpecification",
                        "fields": [
                          {
                            "name": "seats",
                            "type": [
                              "null",
                              "int"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "bedLength",
                            "type": {
                              "type": "record",
                              "name": "QuantityValue",
                              "fields": [
                                {
                                  "name": "value",
                                  "type": "float",
                                 
                                },
                                {
                                  "name": "measureUnit",
                                  "type": [
                                    "null",
                                    {
                                      "type": "record",
                                      "name": "MeasureUnit",
                                      "fields": [
                                        {
                                          "name": "unitName",
                                          "type": "string",
                                        },
                                        {
                                          "name": "unitCode",
                                          "type": "string",
                                        }
                                      ]
                                    }
                                  ],
                                  "default": null
                                }
                              ]
                            }
                          },
                          {
                            "name": "bedWidth",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "backHeightFolded",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "backHeightFoldedOut",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "fullLength",
                            "type": "QuantityValue",
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "MattressSpecification",
                        "fields": [
                          {
                            "name": "bedLength",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "bedWidth",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "height",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "maximumBedLoad",
                            "type": "QuantityValue",
                          }
                        ]
                      },
                      "QuantityValue",
                      "MeasureUnit",
                      {
                        "type": "array",
                        "items": [
                          "Node",
                          "Barcode",
                          {
                            "type": "record",
                            "name": "ItemPack",
                            "fields": [
                              {
                                "name": "packageName",
                                "type": "string",
                              },
                              {
                                "name": "package",
                                "type": [
                                  "null",
                                  {
                                    "type": "record",
                                    "name": "Pack",
                                    "fields": [
                                      {
                                        "name": "packageGuid",
                                        "type": "string",
                                      },
                                      {
                                        "name": "packageType",
                                        "type": [
                                          "null",
                                          "string"
                                        ],
                                        "default": null
                                      },
                                      {
                                        "name": "packaging",
                                        "type": {
                                          "type": "map",
                                          "values": [
                                            {
                                              "type": "record",
                                              "name": "Box",
                                              "fields": [
                                                {
                                                  "name": "length",
                                                  "type": "QuantityValue",
                                                },
                                                {
                                                  "name": "width",
                                                  "type": "QuantityValue",
                                                },
                                                {
                                                  "name": "height",
                                                  "type": "QuantityValue",
                                                }
                                              ]
                                            },
                                            {
                                              "type": "record",
                                              "name": "SoftPackaging",
                                              "fields": [
                                                {
                                                  "name": "twist",
                                                  "type": "boolean",
                                                }
                                              ]
                                            }
                                          ]
                                        },
                                      }
                                    ]
                                  }
                                ],
                                "default": null
                              },
                              {
                                "name": "length",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "width",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "height",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "volume",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "weightNetto",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "weightGross",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "seatsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "packagedUnitsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "setSeatsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "priority",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "measureUnit",
                                "type": [
                                  "null",
                                  "MeasureUnit"
                                ],
                                "default": null
                              },
                              {
                                "name": "mainLogisticsPackaging",
                                "type": [
                                  "null",
                                  "boolean"
                                ],
                                "default": null
                              },
                              {
                                "name": "barcode",
                                "type": [
                                  "null",
                                  "string"
                                ],
                                "default": null
                              }
                            ]
                          },
                          "MediaResource",
                          "string",
                          "int",
                          "boolean"
                        ]
                      },
                      {
                        "type": "map",
                        "values": [
                          "string",
                          "int",
                          "boolean"
                        ]
                      },
                      "string",
                      "int",
                      "boolean"
                    ]
                  }
                }
              ]
            }
          },
          {
            "name": "stateName",
            "type": "string"
          }
        ]
      }
    }
  ]
}Так как выбранный ранее вариант с трансформацией Jolt не работал, я решил применить проверенное средство для крайних случаев — скрипт. Есть хороший процессор — ScriptedTransformRecord, позволяющий выполнять обработку по одной записи, получая объект типа Record.
Скрипт получился довольно большим по причине наличия типов map, а также возможности значения «null» в требуемых полях. Так как существует вероятность модификации формата данных либо корректировки S2T, то в будущем потребуется изменять скрипт, что при его большом объёме влечёт увеличение сложности для разработчика. Так что я решил отказаться от скрипта и вернуться к разработанной и отлаженной спецификации, ведь для JSON она работает корректно, ошибка возникает только при работе с записью, когда применяется схема данных.
То есть в этом случае я решил отказаться от обработки записи и перейти к обработке контента целиком, что приводит к увеличению количества FlowFile и, соответственно, увеличению задействования оперативной памяти.
В NiFi можно применить спецификацию Jolt к записям c помощью JoltTransformRecord или воспользоваться процессором JoltTransformJSON, который ожидает на входе JSON и преобразовывает его не как запись, а как единый JSON-файл. Так как происходит обработка всего контента целиком, то для сокращения накладных расходов лучше подавать на вход единичный объект JSON, а не массив. Поэтому предварительно потребовалась разбивка пришедшего FlowFile на фрагменты, где каждый содержал бы один JSON. Это позволило в дальнейшем выполнить трансформацию для единичного объекта быстро, но повлекло генерацию большого количества FlowFile. Для этого применил SplitRecord, где Reader читал Avro-формат, а RecordSetWriter был настроен для записи одного JSON-объекта.

Для сокращения дальнейших накладных расходов данные объединяются с помощью MergeRecord. Перед слиянием выполнена конвертация в Avro с заданной целевой схемой, соответствующей S2T.

Далее данные вернулись в основной поток и были успешно внесены в базу данных. Таким образом, из-за сложной структуры данных пришлось изменить основной паттерн работы с данными в NiFi — работать с записями. В итоге, допустив проигрыш по ресурсу на небольшом участке потока обработки данных, в целом я выиграл, оставаясь в ограниченном круге инструментов (только Jolt в трансформациях) и вернув данные в общий поток.
По скорости обработки ScriptedTransformRecord был быстрее из-за обработки одного файла с множеством записей. Однако сложность скрипта в процессе модернизации или поддержки снижает привлекательность такого решения. Анализ же спецификации займёт меньше времени, тем самым позволив быстрее вынести корректировки в случае изменения структуры данных.
В ходе дальнейшей работы стало ясно, что ответвление оправдало себя многократно. Так как исходный объект является записью о товаре и включает совокупность большого количества сущностей и связей с другими сущностями, то из одного сообщения понадобилось формировать более десятка стейджинговых таблиц. И решение было простое: раз уже есть разбивка на отдельные JSON, можно разработать отдельные спецификации Jolt для каждого случая, сделать итоговые схемы данных и все преобразования реализовать одинаковым шаблоном.
Что я хотел сказать, описывая этот случай. В NiFi есть много способов достичь требуемого результата. Иногда стоит попробовать несколько вариантов, но выбирать тот, который более прост в понимании разработчиком и требует меньше усилий при поддержке или модификации.
В заключение сформирую ряд рекомендаций для разработчиков, которые, на мой взгляд, будут полезны:
- Вся обработка должна выполняться над записями. Это позволит применять бинарные форматы и минимизировать затраты на преобразования. 
- Переходить к обработке контента целиком только в случае, когда ресурсоёмкость работы с записями (тут и затраты на поддержку, и развитие потока, и сама обработка) превышает выбранную вами границу. 
- Если потребовалось перейти к обработке контента целиком, то необходимо вернуться к записям как можно быстрее, при этом обеспечить удаление всех ссылок на единичные FlowFile для очистки репозитория контента и оперативной памяти. 
- Формировать «универсальные» потоки, где параметры процессоров задаются на основе атрибутов. Это позволит применять один и тот же набор процессоров для разных источников, минимизировав количество запускаемых экземпляров процессоров и снизив нагрузку на планировщик. 
Полезные ссылки:
 
          