Добрый день. Меня зовут Иван Клименко, и я архитектор департамента поддержки продаж компании Arenadata. В основном занимаюсь продуктом Arenadata Streaming (ADS) — это масштабируемая отказоустойчивая система для потоковой обработки данных в режиме реального времени, адаптированная для корпоративного использования и построенная на базе Apache Kafka и Apache NiFi. В продукт входит замечательный сервис Arenadata Streaming NiFi, который является low-code средством построения интеграционных потоков с возможностью масштабирования.

Сегодня хочу показать на одном из практических случаев, что не всегда в NiFi удаётся следовать шаблонному подходу record-oriented в разработке потоков обработки и трансформации данных.

Как известно, в NiFi применяется так называемый record-oriented-подход. Что это значит? Каждый контент FlowFile представляет собой не один элемент, а множество одинаковых по структуре записей. И работать с ним можно как с массивом записей, проводя необходимые трансформации и манипуляции, например выборки, обогащения либо изменения содержимого полей. В этом случае если запись изменяется, то контент FlowFile заново записывается на диск, а в атрибутах меняется системная ссылка на файл контента. Для чтения и записи элементов применяются специальные сервисы: RecordReader и RecordSetWriter, — которые, основываясь на схемах данных, позволяют процессорам обработки оперировать записями, не вдаваясь в детали их хранения. Такой подход позволяет сократить накладные расходы, так как хранить и обращаться к единому множеству записей в одном файле с одним набором метаданных выгоднее, чем хранить каждый файл с метаданными отдельно. Стандарты обработки данных тоже сместили фокус с индивидуальной обработки на работу с записями. И одной из моих рекомендаций всегда является применение обработки записей с помощью предназначенных для этого процессоров: UpdateRecord, JoltTransformRecord, QueryRecord и так далее. Это быстрее за счёт обращения к одному файлу, а оперативная память требуется только на один набор атрибутов, а не на множество.

Но что делать, если в вашем потоке обработки всё построено на записях, но от одного источника приходят данные, которые выпадают из общей парадигмы? Я расскажу об одном случае из моей практики, когда при решении задачи наполнения хранилища пришлось провести нестандартную обработку.

Для понимания задачи опишу информационную среду, в которой я разрабатывал интеграционные процессы. Есть компания, которая производит и продаёт товары, допустим мебель. Она использует различные бизнес-сервисы, позволяющие автоматизировать задачи по обороту товаров, логистике, складскому учёту, кадровым задачам, продажам и производству. Данные для построения отчётности компания собирает в аналитическом хранилище. А сама аналитика информации нужна для комплексной оценки эффективности бизнеса, текущей работы компании и прогнозирования во всех сферах деятельности. Источниками данных для хранилища являются различные ERP-, CRM-, PIM-, MDM-системы, веб-приложения и приложения обработки данных компании. В целях оптимизации информационного обмена прямой доступ между системами заменён на обмен сообщениями через единую шину данных. Чтобы сократить объём данных, в компании введено правило: в топике Kafkaнаходятся сообщения, сформированные по одной схеме, и сами сообщения преобразованы в Avro по стандарту Confluent. Схемы сохранены в ADS. Schema Registry. Обобщённая структура информационной среды представлена на рисунке.

Рисунок1. Структура и связь сервисов обработки данных
Рисунок1. Структура и связь сервисов обработки данных

Также в компании принят стандарт передачи сообщения в виде блока метаданных и данных. Обязательным элементом метаданных является время генерации сообщения, остальные поля оставлены на усмотрение генерирующий системы. Таким образом, данные представляют собой структуру, разделённую на два уровня: информационный блок и непосредственно структуру от источника, например, в таком виде:

Пример передаваемых данных
{
  "metadata" : {
    "ts" : "2020-01-01 00:00:00.0000"
  },
  "data" : {
    "someFields1" : "some value", 
    "someFieeld2" : 0,
    "someFieeld2 : true
  }
}

ETL-процесс наполнения хранилища выглядит максимально просто: извлечение данных из требуемых топиков, необходимые трансформации по приведению[АД1]  к структуре, пригодной для внесения в таблицу базы данных, сопоставление исходного топика целевой таблице в хранилище и внесение данных в хранилище.

Следующим этапом нужно подготовить данные для внесения в таблицу. Для этого требуется преобразовать полученную иерархическую структуру в плоскую таблицу, сдвинув поля из вложенности на уровень вверх, убрать ненужные теги metadata и data. Дополнительно понадобилось добавить в структуру время формирования записи из метаданных. Стейджинг формируется таком образом, что одному топику, а следовательно, одной структуре, соответствует одна целевая таблица, при этом имя таблицы соответствует имени топика. Дальнейшая обработка данных вынесена в процедуры расчёта хранилища и не завязана на сам ETL-процесс.

При получении данных из Kafka с помощью процессора ConsumeKafkaRecord флаг "Separate By Key" выставлен в значение "false", что позволяет формировать один FlowFile для нескольких сообщений. Так как нужно просто «сдвинуть» поля на один уровень, в качестве инструмента сразу напрашивается Jolt.  Для упрощения структуры данных можно применить простую Jolt-трансформацию, которая перебирает все поля второго уровня и сдвигает их на первый уровень:

Трансформация, преобразующая двухуровневую иерархию в плоскую струтуру
[
  {
    "operation": "shift",
    "spec": {
      "*": { "*": "&" }
    }
  }
]

Трансформация позволяет работать с одной записью, и её можно применить в процессоре JoltTransformRecord. Таким образом, получается универсальный процесс, извлекающий данные из Kafka, трансформирующий их в плоскую структуру и передающий в целевую базу данных.

Конечно, представленный поток не готов к выводу в продуктовую среду, это только его ядро, которое необходимо дополнить обработкой ошибок, сформировать повторы, добавить оповещение. Тем не менее в таком виде уже получаем универсальный поток обработки, позволяющий загружать данные из любого топика в соответствующую таблицу целевой базы. Если внести список топиков в параметры контекста и управлять ими через API, то сможем автоматизировано управлять источниками.

Бизнес-системы не являются статичными, и список источников данных для аналитических хранилищ постоянно изменяется. В один момент появилась PIM/MDM-система, генерирующая данные не в простом формате, а с гораздо более сложной иерархией.

Сложная структура данных, генерируемая PIM/MDM-системой (очень большой JSON)
{
  "metadata" : {
    "ts" : "2024-05-14 22:15:68.5896"
  },
  "data" : {
    "object" : {
      "guid" : "04551935-d520-5785-b257-c8ae8240711c",
      "template" : "Item",
      "language" : null,
      "fields" : {
        "itemGroup" : {
          "guid" : "497b6e51-4c1d-5ce0-be02-b8f37ba35c26",
          "template" : "ItemGroup",
          "language" : null,
          "fields" : {
            "name" : "GroupNanme",
            "parentObject" : {
              "guid" : "32d38329-1868-5f51-bb30-f824258539fb",
              "template" : "ItemGroup",
              "language" : null,
              "fields" : {
                "name" : "parent name"
              }
            },
            "specification" : {
              "guid" : null,
              "template" : "SpecificationBrickContainer",
              "language" : null,
              "fields" : {
                "internalGroupSpecification" : {
                  "guid" : null,
                  "template" : "InternalGroupSpecificationBrick",
                  "language" : null,
                  "fields" : {
                    "volumeRatio" : "1.3",
                    "groupAttribute" : {
                      "guid" : "84281670-f476-5f3f-8b31-382f02e13e83",
                      "template" : "ItemGroup",
                      "language" : null,
                      "fields" : {
                        "name" : "group name",
                        "parentObject" : {
                          "guid" : "1ba54e86-bc86-59ac-93e9-34fa4a68abe3",
                          "template" : "Folder",
                          "language" : null,
                          "fields" : {
                            "name" : "another name"
                          }
                        }
                      }
                    },
                    "codeGroup" : "5561010305"
                  }
                }
              }
            }
          }
        },
        "additionalInformation" : {
          "guid" : null,
          "template" : "AdditionalInformationBrickContainer",
          "language" : null,
          "fields" : {
            "additionalInformation" : [ "AdditionalInformationBrick" ]
          }
        },
        "manufactureCountry" : {
          "guid" : "f64ab0a6-cff1-5dcd-9f1c-cc43f1c58d03",
          "template" : "Property",
          "language" : null,
          "fields" : {
            "name" : "US"
          }
        },
        "code" : "5896dkwskeo_kedoed896",
        "parentObject" : {
          "guid" : "ce5a885b-8170-5bbe-80c1-c0d2d41d0cad",
          "template" : "Item",
          "language" : null,
          "fields" : {
            "name" : "parent name",
            "parentObject" : {
              "guid" : "83c7b0c5-3d1c-50ff-a9f0-0b0fc6e47cbd",
              "template" : "ItemTemplate",
              "language" : null,
              "fields" : {
                "name" : "object source name"
              }
            },
            "statusCode" : "draft"
          }
        },
        "ean13" : "462711234164",
        "measureUnit" : {
          "unitName" : "item",
          "unitCode" : "796"
        },
        "main" : {
          "guid" : null,
          "template" : "MainBrickContainer",
          "language" : null,
          "fields" : {
            "localizedDescription" : [ "LocalizedDescriptionBrick" ]
          }
        },
        "media" : {
          "guid" : null,
          "template" : "MediaBrickContainer",
          "language" : null,
          "fields" : {
            "media" : [ "MediaBrick" ]
          }
        },
        "commerce" : {
          "guid" : null,
          "template" : "CommerceBrickContainer",
          "language" : null,
          "fields" : {
            "product" : {
              "guid" : null,
              "template" : "ProductBrick",
              "language" : null,
              "fields" : {
                "ean13" : "4627112341690"
              }
            }
          }
        },
        "pack" : [ {
          "packageName" : "coverage",
          "package" : null,
          "length" : {
            "value" : 140.0,
            "measureUnit" : {
              "unitName" : "sm",
              "unitCode" : "004"
            }
          },
          "width" : {
            "value" : 200.0,
            "measureUnit" : {
              "unitName" : "sm",
              "unitCode" : "004"
            }
          },
          "height" : {
            "value" : 10.0,
            "measureUnit" : {
              "unitName" : "sm",
              "unitCode" : "004"
            }
          },
          "volume" : {
            "value" : 0.28,
            "measureUnit" : {
              "unitName" : "m3",
              "unitCode" : "113"
            }
          },
          "weightNetto" : {
            "value" : 35.9,
            "measureUnit" : {
              "unitName" : "kg",
              "unitCode" : "166"
            }
          },
          "weightGross" : {
            "value" : 35.9,
            "measureUnit" : {
              "unitName" : "kg",
              "unitCode" : "166"
            }
          },
          "seatsCount" : null,
          "packagedUnitsCount" : null,
          "setSeatsCount" : null,
          "priority" : null,
          "measureUnit" : null,
          "mainLogisticsPackaging" : null,
          "barcode" : null
        } ],
        "isMatrix" : 1,
        "series" : {
          "guid" : "f68d1bc3-2600-5b40-b9b1-957d039cdd6b",
          "template" : "tmp",
          "language" : null,
          "fields" : {
            "name" : "seria name",
            "priceSegment" : {
              "guid" : "364044ec-0f96-5a5f-9028-9dc25f752e5e",
              "template" : "Property",
              "language" : null,
              "fields" : {
                "name" : "middle"
              }
            },
            "parentObject" : {
              "guid" : "a497557f-4eda-5f16-807a-4b71eb5c1b74",
              "template" : "Seria",
              "language" : null,
              "fields" : {
                "name" : "GENERAL Seria name"
              }
            }
          }
        },
        "name" : "Common Name",
        "options" : {
          "guid" : null,
          "template" : "OptionsCollection",
          "language" : null,
          "fields" : {
            "length" : {
              "guid" : null,
              "template" : "OptionsCollectionItem",
              "language" : null,
              "fields" : {
                "propertyType" : "length",
                "relation" : [ {
                  "guid" : "28df4e5e-5201-519a-a3a5-57d7e1838361",
                  "template" : "Property",
                  "language" : null,
                  "fields" : {
                    "name" : 200,
                    "options" : {
                      "guid" : null,
                      "template" : "OptionsBrickContainer",
                      "language" : null,
                      "fields" : {
                        "propertyMDMData" : {
                          "guid" : null,
                          "template" : "PropertyMDMDataBrick",
                          "language" : null,
                          "fields" : {
                            "code" : 10004
                          }
                        },
                        "propertySize" : {
                          "guid" : null,
                          "template" : "PropertySizeBrick",
                          "language" : null,
                          "fields" : {
                            "physicalQuantity" : "length",
                            "numericValue" : {
                              "value" : 200.0,
                              "measureUnit" : {
                                "unitName" : "sm",
                                "unitCode" : "004"
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                } ]
              }
            },
            "width" : {
              "guid" : null,
              "template" : "OptionsCollectionItem",
              "language" : null,
              "fields" : {
                "propertyType" : "width",
                "relation" : [ {
                  "guid" : "59c08ec1-d0ff-55c4-99a4-6616f23f1469",
                  "template" : "Property",
                  "language" : null,
                  "fields" : {
                    "name" : 140,
                    "options" : {
                      "guid" : null,
                      "template" : "OptionsBrickContainer",
                      "language" : null,
                      "fields" : {
                        "propertyMDMData" : {
                          "guid" : null,
                          "template" : "PropertyMDMDataBrick",
                          "language" : null,
                          "fields" : {
                            "code" : 10005
                          }
                        },
                        "propertySize" : {
                          "guid" : null,
                          "template" : "PropertySizeBrick",
                          "language" : null,
                          "fields" : {
                            "physicalQuantity" : "width",
                            "numericValue" : {
                              "value" : 140.0,
                              "measureUnit" : {
                                "unitName" : "sm",
                                "unitCode" : "004"
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                } ]
              }
            },
            "height" : {
              "guid" : null,
              "template" : "OptionsCollectionItem",
              "language" : null,
              "fields" : {
                "propertyType" : "height",
                "relation" : [ {
                  "guid" : "c28c2569-7a0d-5d1a-94d4-4907dc91f20a",
                  "template" : "Property",
                  "language" : null,
                  "fields" : {
                    "name" : 112,
                    "options" : {
                      "guid" : null,
                      "template" : "OptionsBrickContainer",
                      "language" : null,
                      "fields" : {
                        "propertyMDMData" : {
                          "guid" : null,
                          "template" : "PropertyMDMDataBrick",
                          "language" : null,
                          "fields" : {
                            "code" : 10006
                          }
                        },
                        "propertySize" : {
                          "guid" : null,
                          "template" : "PropertySizeBrick",
                          "language" : null,
                          "fields" : {
                            "physicalQuantity" : "height",
                            "numericValue" : {
                              "value" : 112.0,
                              "measureUnit" : {
                                "unitName" : "sm",
                                "unitCode" : "004"
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                } ]
              }
            }
          }
        },
        "sku" : 1563340922,
        "statusCode" : "draft"
      }
    },
    "stateName" : "Updated"
  }

В данных от этой системы передаётся полная информация о товаре, его составляющих, размерах, свойствах упаковки. Такой формат позволяет потребителям самостоятельно определять способы хранения соответствующих сведений для каждого свойства товара, а системе — генерировать одно сообщение на единицу товара, не выгружая в отдельные потоки связанные данные. Мне же требовалось сформировать из этих сообщений плоскую таблицу по заданному SourceToTarget:

Поле

Имя поля 

Тип данных

data.object.guid

guid

uniqueidentifier

data.object.fields.manufactureCountry.guid

manufactureCountryGuid

uniqueidentifier

data.object.fields.manufactureCountry.fields.name

manufactureCountryName

text

data.object.fields.itemGroup.guid

itemGroupGuid

uniqueidentifier

data.object.fields.parentObject.guid

parentGuid

uniqueidentifier

data.object.fields.sourceProduction.[0]

sourceProductionValue

bool

data.object.fields.measureUnit.unitName

unitName

text

data.object.fields.measureUnit.unitCode

unitCode

int

data.object.fields.commerce.fields.product.fields.tax

tax

int

data.object.fields.series.guid

seriesGuid

uniqueidentifier

data.object.fields.series.fields.name

seriesName

text

data.object.fields.series.fields.priceSegment.guid

priceSegmentGuid

uniqueidentifier

data.object.fields.series.fields.priceSegment.fields.name

priceSegmentName

text

data.object.fields.name

name

text

data.object.fields.dimensionCharacteristics

dimensionCharacteristics

int

data.object.fields.sku

sku

text

data.object.fields.isOutOfCollection

isOutOfCollection

bool

data.object.fields.isMatrix

isMatrix

bool

data.object.fields.kit

kit

int

data.object.fields.numberOfParts

numberOfParts

int

data.object.fields.ean13

ean13

text

data.object.fields.code

code

text

data.object.fields.statusCode

statusCode

text

data.stateName

stateName

text

metadata.ts

ts

datetime2

Выход простой: написать спецификацию Jolt для текущего случая, сделать ответвление на основании имени топика, а после трансформации вернуть поток в стандартный маршрут. Разработав и отладив спецификацию, применил её в JoltTransformRecord.

Спецификация, решающая поставленную задачу
[
  {
    "operation": "shift",
    "spec": {
      "metadata": {
        "*": "&"
      },
      "data": {
        "object": {
          "guid": "&",
          "fields": {
            "manufactureCountry": {
              "guid": "manufactureCountryGuid",
              "fields": {
                "name": "manufactureCountryName"
              }
            },
            "itemGroup": {
              "guid": "itemGroupGuid"
            },
            "parentObject": {
              "guid": "parentGuid"
            },
            "sourceProduction": {
              "0": "SourceProductionValue"
            },
            "measureUnit": {
              "*": "&"
            },
            "commerce": {
              "fields": {
                "product": {
                  "fields": {
                    "tax": "&"
                  }
                }
              }
            },
            "series": {
              "guid": "seriesGuid",
              "fields": {
                "name": "seriesName",
                "priceSegment": {
                  "guid": "priceSegmentGuid",
                  "fields": {
                    "name": "priceSegmentName"
                  }
                }
              }
            },
            "name": "&",
            "dimensionCharacteristics": "&",
            "sku": "&",
            "isOutOfCollection": "&",
            "isMatrix": "&",
            "kit": "&",
            "numberOfParts": "&",
            "ean13": "&",
            "code": "&",
            "statusCode": "&"
          }
        },
        "stateName": "StateName"
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "SourceProductionValue": "=toInteger",
      "unitCode": "=toInteger",
      "tax": "=toInteger",
      "dimensionCharacteristics": "=toInteger",
      "isOutOfCollection": "=toInteger",
      "isMatrix": "=toInteger",
      "kit": "=toInteger",
      "numberOfParts": "=toInteger",
      "sku": "=toString"
    }
  },
  {
    "operation": "shift",
    "spec": {
      "isOutOfCollection": {
        "0": {
          "#false": "isOutOfCollection"
        },
        "1": {
          "#true": "isOutOfCollection"
        }
      },
      "isMatrix": {
        "0": {
          "#false": "isMatrix"
        },
        "1": {
          "#true": "isMatrix"
        }
      },
      "kit": {
        "0": {
          "#false": "kit"
        },
        "1": {
          "#true": "kit"
        }
      },
      "*": "&"
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "isOutOfCollection": "=toBoolean",
      "isMatrix": "=toBoolean",
      "kit": "=toBoolean"
    }
  }
]

Пояснение по блокам спецификации JOLT

Первый блок "shift" сдвигает данные на один уровень. В нем все просто - взять указанное значение и поместить его в указанное поле, но на другом уровне. Например, поле "data.object.guid" перемещается в "guid":

{
    "operation": "shift",
    "spec": {
      "metadata": {
        "*": "&"
      },
      "data": {
        "object": {
          "guid": "&"

Следующий блок "modify-overwrite-beta" преобразует данные к требуемым типам, так как в исходных данных тип может не соответствовать целевому. Пример такого преобразования для поля "isMatrix". Несмотря на то, что в исходном виде это число 0 или 1, в результате трасформации поле приводилось к строке.

"isMatrix": "=toInteger"

Цель преобразования некоторых полей к численным типам раскрывается в следующем блоке трансформации "shift". Не многие знают, что с помощью это операции можно не только перенести значения из одного поля в другое, но также делать это с условиями:

"isMatrix": {
        "0": {
          "#false": "isMatrix"
        },
        "1": {
          "#true": "isMatrix"
        }

Исходя из значения поля "isMatrix" значение в нем заменяется на строку "true" или "false". И самым последним блоком строковое представление преобразуется к логическому типу:

{
    "operation": "modify-overwrite-beta",
    "spec": {
      "isOutOfCollection": "=toBoolean",
      "isMatrix": "=toBoolean",
      "kit": "=toBoolean"
    }
  }

В результате получено полное соответствие S2T.

Однако в процессе применения трансформации формировалась ошибка:

JoltTransformRecord[id=018f1086-7c30-1015-2569-a67ef2af5b1d] Unable to transform FlowFile[filename=0c119857-beac-4f4f-96c7-406b71c2c9fc] due to org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value of class [Ljava.lang.Object; because the type is not supported: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value of class [Ljava.lang.Object; because the type is not supported

При получении ошибки я провёл ряд манипуляций, которые делаю всегда: разбить FlowFile на единичные записи, упростить трансформацию, преобразовать форматы. Результат был отрицательным, так как процессор JoltTransformRecord постоянно выдавал ошибку. Упрощая структуру сообщения, я выяснил, что ошибка возникает потому, что JoltTransformRecord не может корректно обработать массив, в котором присутствуют вариативные типы данных, определяемые типом map. Также не помог вариант с предварительным формированием целевой схемы данных и указанием её соответствующему RecordSetWriter. Перевод типа данных из Avro в JSON тоже не дал результата, так как тип map и массивы записей никуда не ушли и стали генерироваться на основании самих данных. 

Avro-схема данных
{
  "type": "record",
  "name": "Mdm_Record",
  "namespace": "any.org",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "MessageInfo",
        "namespace": "any.org",
        "fields": [
          {
            "name": "ts",
            "type": {
              "type": "long",
              "logicalType": "timestamp-millis"
            }            
          }
        ]
      }
    },
    {
      "name": "data",
      "type": {
        "type": "record",
        "name": "MDM_Item",
        "fields": [
          {
            "name": "object",
            "type": {
              "type": "record",
              "name": "Node",
              "fields": [
                {
                  "name": "guid",
                  "type": [
                    "null",
                    "string"
                  ],               
                  "default": null
                },
                {
                  "name": "template",
                  "type": [
                    "null",
                    "string"
                  ],                 
                  "default": null
                },
                {
                  "name": "language",
                  "type": [
                    "null",
                    "string"
                  ],                 
                  "default": null
                },
                {
                  "name": "fields",
                  "type": {
                    "type": "map",
                    "values": [
                      "Node",
                      {
                        "type": "record",
                        "name": "Barcode",
                        "fields": [
                          {
                            "name": "provider",
                            "type": {
                              "type": "record",
                              "name": "Provider",
                              "fields": [
                                {
                                  "name": "name",
                                  "type": "string"                                 
                                },
                                {
                                  "name": "city",
                                  "type": [
                                    "null",
                                    "string"
                                  ],                                 
                                  "default": null
                                },
                                {
                                  "name": "taxIdNumber",
                                  "type": [
                                    "null",
                                    "string"
                                  ],
                                  "default": null
                                },
                                {
                                  "name": "taxRegistrationReasonCode",
                                  "type": [
                                    "null",
                                    "string"
                                  ],                                 
                                  "default": null
                                }
                              ]
                            }
                           
                          },
                          {
                            "name": "barcode",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "supplierArticle",
                            "type": [
                              "null",
                              "string"
                            ],
                           "default": null
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "MediaResource",
                        "fields": [
                          {
                            "name": "link",
                            "type": "string",                           
                          },
                          {
                            "name": "filename",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "mimetype",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "fileSize",
                            "type": [
                              "null",
                              "int"
                            ],
                           
                            "default": null
                          },
                          {
                            "name": "format",
                            "type": [
                              "null",
                              "string"
                            ],                            
                            "default": null
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "SofaSpecification",
                        "fields": [
                          {
                            "name": "seats",
                            "type": [
                              "null",
                              "int"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "bedLength",
                            "type": {
                              "type": "record",
                              "name": "QuantityValue",
                              "fields": [
                                {
                                  "name": "value",
                                  "type": "float",
                                 
                                },
                                {
                                  "name": "measureUnit",
                                  "type": [
                                    "null",
                                    {
                                      "type": "record",
                                      "name": "MeasureUnit",
                                      "fields": [
                                        {
                                          "name": "unitName",
                                          "type": "string",
                                        },
                                        {
                                          "name": "unitCode",
                                          "type": "string",
                                        }
                                      ]
                                    }
                                  ],
                                  "default": null
                                }
                              ]
                            }
                          },
                          {
                            "name": "bedWidth",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "backHeightFolded",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "backHeightFoldedOut",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "fullLength",
                            "type": "QuantityValue",
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "MattressSpecification",
                        "fields": [
                          {
                            "name": "bedLength",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "bedWidth",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "height",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "maximumBedLoad",
                            "type": "QuantityValue",
                          }
                        ]
                      },
                      "QuantityValue",
                      "MeasureUnit",
                      {
                        "type": "array",
                        "items": [
                          "Node",
                          "Barcode",
                          {
                            "type": "record",
                            "name": "ItemPack",
                            "fields": [
                              {
                                "name": "packageName",
                                "type": "string",
                              },
                              {
                                "name": "package",
                                "type": [
                                  "null",
                                  {
                                    "type": "record",
                                    "name": "Pack",
                                    "fields": [
                                      {
                                        "name": "packageGuid",
                                        "type": "string",
                                      },
                                      {
                                        "name": "packageType",
                                        "type": [
                                          "null",
                                          "string"
                                        ],
                                        "default": null
                                      },
                                      {
                                        "name": "packaging",
                                        "type": {
                                          "type": "map",
                                          "values": [
                                            {
                                              "type": "record",
                                              "name": "Box",
                                              "fields": [
                                                {
                                                  "name": "length",
                                                  "type": "QuantityValue",
                                                },
                                                {
                                                  "name": "width",
                                                  "type": "QuantityValue",
                                                },
                                                {
                                                  "name": "height",
                                                  "type": "QuantityValue",
                                                }
                                              ]
                                            },
                                            {
                                              "type": "record",
                                              "name": "SoftPackaging",
                                              "fields": [
                                                {
                                                  "name": "twist",
                                                  "type": "boolean",
                                                }
                                              ]
                                            }
                                          ]
                                        },
                                      }
                                    ]
                                  }
                                ],
                                "default": null
                              },
                              {
                                "name": "length",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "width",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "height",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "volume",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "weightNetto",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "weightGross",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "seatsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "packagedUnitsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "setSeatsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "priority",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "measureUnit",
                                "type": [
                                  "null",
                                  "MeasureUnit"
                                ],
                                "default": null
                              },
                              {
                                "name": "mainLogisticsPackaging",
                                "type": [
                                  "null",
                                  "boolean"
                                ],
                                "default": null
                              },
                              {
                                "name": "barcode",
                                "type": [
                                  "null",
                                  "string"
                                ],
                                "default": null
                              }
                            ]
                          },
                          "MediaResource",
                          "string",
                          "int",
                          "boolean"
                        ]
                      },
                      {
                        "type": "map",
                        "values": [
                          "string",
                          "int",
                          "boolean"
                        ]
                      },
                      "string",
                      "int",
                      "boolean"
                    ]
                  }
                }
              ]
            }
          },
          {
            "name": "stateName",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Так как выбранный ранее вариант с трансформацией Jolt не работал, я решил применить проверенное средство для крайних случаев — скрипт. Есть хороший процессор — ScriptedTransformRecord, позволяющий выполнять обработку по одной записи, получая объект типа Record.

Скрипт получился довольно большим по причине наличия типов map, а также возможности значения «null» в требуемых полях. Так как существует вероятность модификации формата данных либо корректировки S2T, то в будущем потребуется изменять скрипт, что при его большом объёме влечёт увеличение сложности для разработчика. Так что я решил отказаться от скрипта и вернуться к разработанной и отлаженной спецификации, ведь для JSON она работает корректно, ошибка возникает только при работе с записью, когда применяется схема данных. 

 То есть в этом случае я решил отказаться от обработки записи и перейти к обработке контента целиком, что приводит к увеличению количества FlowFile и, соответственно, увеличению задействования оперативной памяти.

 В NiFi можно применить спецификацию Jolt к записям c помощью JoltTransformRecord или воспользоваться процессором JoltTransformJSON, который ожидает на входе JSON и преобразовывает его не как запись, а как единый JSON-файл. Так как происходит обработка всего контента целиком, то для сокращения накладных расходов лучше подавать на вход единичный объект JSON, а не массив. Поэтому предварительно потребовалась разбивка пришедшего FlowFile на фрагменты, где каждый содержал бы один JSON. Это позволило в дальнейшем выполнить трансформацию для единичного объекта быстро, но повлекло генерацию большого количества FlowFile. Для этого применил SplitRecord, где Reader читал Avro-формат, а RecordSetWriter был настроен для записи одного JSON-объекта. 

Рисунок 3. Настройка SplitRecord
Рисунок 3. Настройка SplitRecord

Для сокращения дальнейших накладных расходов данные объединяются с помощью MergeRecord. Перед слиянием выполнена конвертация в Avro с заданной целевой схемой, соответствующей S2T.

Рисунок 4. Готовый flow, объединяющий преобразованные данные в единый FlowFile
Рисунок 4. Готовый flow, объединяющий преобразованные данные в единый FlowFile

Далее данные вернулись в основной поток и были успешно внесены в базу данных. Таким образом, из-за сложной структуры данных пришлось изменить основной паттерн работы с данными в NiFi — работать с записями. В итоге, допустив проигрыш по ресурсу на небольшом участке потока обработки данных, в целом я выиграл, оставаясь в ограниченном круге инструментов (только Jolt в трансформациях) и вернув данные в общий поток.

По скорости обработки ScriptedTransformRecord был быстрее из-за обработки одного файла с множеством записей. Однако сложность скрипта в процессе модернизации или поддержки снижает привлекательность такого решения. Анализ же спецификации займёт меньше времени, тем самым позволив быстрее вынести корректировки в случае изменения структуры данных.

В ходе дальнейшей работы стало ясно, что ответвление оправдало себя многократно. Так как исходный объект является записью о товаре и включает совокупность большого количества сущностей и связей с другими сущностями, то из одного сообщения понадобилось формировать более десятка стейджинговых таблиц. И решение было простое: раз уже есть разбивка на отдельные JSON, можно разработать отдельные спецификации Jolt для каждого случая, сделать итоговые схемы данных и все преобразования реализовать одинаковым шаблоном. 

Что я хотел сказать, описывая этот случай. В NiFi есть много способов достичь требуемого результата. Иногда стоит попробовать несколько вариантов, но выбирать тот, который более прост в понимании разработчиком и требует меньше усилий при поддержке или модификации.

В заключение сформирую ряд рекомендаций для разработчиков, которые, на мой взгляд, будут полезны:

  1.  Вся обработка должна выполняться над записями. Это позволит применять бинарные форматы и минимизировать затраты на преобразования.

  2. Переходить к обработке контента целиком только в случае, когда ресурсоёмкость работы с записями (тут и затраты на поддержку, и развитие потока, и сама обработка) превышает выбранную вами границу.

  3. Если потребовалось перейти к обработке контента целиком, то необходимо вернуться к записям как можно быстрее, при этом обеспечить удаление всех ссылок на единичные FlowFile для очистки репозитория контента и оперативной памяти.

  4. Формировать «универсальные» потоки, где параметры процессоров задаются на основе атрибутов. Это позволит применять один и тот же набор процессоров для разных источников, минимизировав количество запускаемых экземпляров процессоров и снизив нагрузку на планировщик.

Полезные ссылки:

  1. Сайт Arenadata

  2. Описание продукта Arenadata Streaming

  3. Сообщество Apache Nifi в телеграм

  4. Приложение, позволяющее отлаживать Jolt

Комментарии (0)