На текущем проекте у нас начинает активно использоваться Apache NiFi в качестве основного ETL/ELT-инструмента. NiFi используется для получения данных из различных источников (Kafka, REST, HDFS) и подготовки данных для их последующей загрузки в основное хранилище на базе Greenplum. Загрузка подготовленных данных в Greenplum реализована средствами последнего (PXF), поэтому NiFi только подготавливает данные в формате Avro и записывает их в HDFS.

Немного о задаче. Пусть мы имеем информацию о подписках пользователя на уведомления для различных разделов/сервисов портала. Для каждого раздела пользователи могут указать от нуля до нескольких видов транспорта, которым эти уведомления будут доставляться, например, PUSH, EMAIL, SMS. Требуется обеспечить загрузку этих данных в наше аналитическое хранилище.

Исходные данные

Данные приходят в Apache Kafka, формат данных примерно такой:

{
    "uID": 1000358546,
    "events": [{
            "eventTypeCode": "FEEDBACK",
            "transports": ["PUSH", "SMS"]
        }, {
            "eventTypeCode": "MARKETING",
            "transports": ["PUSH", "EMAIL"]
        }, {
            "eventTypeCode": "ORDER_STATUS",
            "transports": ["SOC_VK"]
        }
    ]
 }

Вроде всё просто, но у PXF есть некоторые сложности с загрузкой иерархических структур (массив простых значений оно загрузит, но массив объектов - нет), поэтому нам нужно сделать наши данные максимально плоскими. Да, и цель статьи - показать, что можно сделать с JSON используя встроенные процессоры.

Для начинающих

Для экспериментов в NiFi можно создать процессорную группу, выбрав в качестве первого процессора GenerateFlowFile поместив в параметр Custom Text текст нашего Json.

FlattenJson

Одним из способов сделать из произвольного json плоский является процессор FlattenJson. Процессор предоставляет пользователю возможность взять вложенный документ JSON и представить его в простой документ содержащий пары ключ/значение. Ключи объединяются на каждом уровне с помощью определяемого пользователем разделителя, который по умолчанию имеет значение ".". Процессор поддерживает три режима преобразования (Flatten Mode): normal, keep-arrays и dot notation (применяется для запросов в MongoDB). Режим преобразования по умолчанию - "keep-arrays". В режиме keep-arrays мы получим, для нашего примера, практически исходный Json. Поэтому этот режим нам не подходит, и мы переключим процесоор на режим normal. В результате мы получим такой json:

{
  "uID" : 1000358546,
  "events[0].eventTypeCode" : "FEEDBACK",
  "events[0].transports[0]" : "PUSH",
  "events[0].transports[1]" : "SMS",
  "events[1].eventTypeCode" : "MARKETING",
  "events[1].transports[0]" : "PUSH",
  "events[1].transports[1]" : "EMAIL",
  "events[2].eventTypeCode" : "ORDER_STATUS",
  "events[2].transports[0]" : "SOC_VK"
}

Что-ж, оно действительно плоское, но... Если такое попытаться завернуть в avro, то оно сломается. Если не на этапе конвертации, то на этапе загрузки в Greenplum точно. Всё по причине того, что тут буквально для каждого пользователя будет свой набор полей и схема каждого avro-файла будет различаться.

Этот процессор, сам по себе весьма полезный в иных случаях, в чистом виде нам не подходит. Смотрим, что же мы можем использовать ещё.

JoltTransformJSON

JoltTransformJSON, пожалуй, самый мощный в арсенале NiFi, процессор для трансформации Json. Он позволяет применять список Jolt-спецификаций к нашему Json. На хабре уже была статья, посвященная этому процессору. Но, позвольте мне рассказать об этом процессоре применительно к нашей задаче.

Вариантов решения нашей задачи как минимум два - это либо обработать Json, полученный с помощью FlattenJson-процессора с помощью Jolt, либо попробовать от FlattenJson избавиться и решить всё с помощью JoltTransformJson.

Но, для начала опишем, какие же возможности предоставляет нам этот процессор. Самое главное - это возможность работать с Jolt-спецификацией в расширенном редакторе, где можно не только писать спецификацию, проверить её корректность, но и выполнить трансформацию произвольного json не покидая окна редактора.

Это всё круто и очень помогает в работе, но, если честно, я предпочитаю использовать Jolt Transform Demo (jolt-demo.appspot.com) . Субъективно он более удобен и там есть примеры с комментариями для начала работы с Jolt.

Итак, как вы видите, на картинке выше, я начал с простой спецификации:

[{
	"operation": "shift",
	"spec": {
		"*": "&"
	}
}]

Эта спецификация, по сути ничего не трансформирует, поскольку говорит "возьми любое поле и выведи его как оно есть". Будем исправлять. Для начала определить нашу цель. А уже потом напишем для неё Jolt-спецификацию.

Итак, у нас на входе:

{
    "uID": 1000358546,
    "events": [{
            "eventTypeCode": "FEEDBACK",
            "transports": ["PUSH", "SMS"]
        }, {
            "eventTypeCode": "MARKETING",
            "transports": ["PUSH", "EMAIL"]
        }, {
            "eventTypeCode": "ORDER_STATUS",
            "transports": ["SOC_VK"]
        }
    ]
}

А хотим мы получить такой json на выходе:

{
	"uID": 1000358546,
	"FEEDBACK": ["PUSH", "SMS"],
	"MARKETING": ["PUSH", "EMAIL"],
	"ORDER_STATUS": ["SOC_VK"]
}

Давайте напишем для него спецификацию. Нам нужно для каждого значения в events->transports взять ключ из events->eventTypeCode и в результате записать с этим ключем массив значений. Поле uID оставляем без изменений.

[{
	"operation": "shift",
	"spec": {
		"events": {
			"*": {
				"transports": {
					"*": {
						"@": "@(3,eventTypeCode)[]"
					}
				}
			}
		},
		"*": "&"
	}
}]
Пояснение спецификации

"events":{"*":{"transports":{"*":{,думаю, не вызывает особых сложностей. Здесь мы для каждого events берём каждый элемент массива (это первая "*") для которого из transports берём каждый элемент массива (вторая "*").

"@": "@(3,eventTypeCode)[]" вот тут самое интересное. Левая, от двоеточия, часть ("@") говорит о том, что мы берём текущее значение элемента массива, а это у нас PUSH для самого первого совпадения. А вот правая часть говорит о том, с каким ключём мы запишем это значение в результирующий json. И запись @(3,eventTypeCode) означает, что для того, чтобы получить имя ключа, нам нужно поднятся на 3 уровня выше (на уровень первой "*") и взять там значение поля eventTypeCode. Если, опять же, рассматривать самое первое совпадение, то это значение будет FEEDBACK - это и будет ключём, в который будет записано значение PUSH.

Думаю, вы заметили квадратные скобки на конце правой части выражения? Они говорят о том, что ключ, который мы определяем, должен быть массивом. Если их не поставить, то тогда, например для ORDER_STATUS, мы получим не массив, а просто строку. Для других типов событий у нас определено несколько значений транспорта, поэтому они будут объединены в массив автоматически. Т.е. результат мог бы выглядеть так:

{
    "uID": 1000358546,
    "FEEDBACK": ["PUSH", "SMS"],
    "MARKETING": ["PUSH", "EMAIL"],
    "ORDER_STATUS": "SOC_VK"
}

И так, мы добились желаемого результата. Но, давайте подумаем, какие сложности с данным вариантом. А мы опять имеем проблемы с потенциально различными схемами для разных пользователей. Я бы даже сказал с гарантированными. Это можно обойти, если при конвертации в avro будем использовать определенную схему, а не выводить её из данных json. Но, это значит, что мы должны в этой схеме заранее прописать все типы событий всех наших сервисов, и менять схему при каждом изменении их состава. Было бы легче, если бы мы взяли в качестве ключа транспорт, а в качестве значения массив типов событий, для которых уведомления используют данный транспорт. Я не буду приводить Jolt-спецификацию для данной трансформации, оставлю это в качестве задания читателю. Да, видов транспорта сильно меньше, чем событий, но это не гарантирует нам постоянство их списка.

И так, подумаем, как мы можем обеспечить постоянство схемы выходных данных, если у нас на входе могут меняться как типы событий так и виды транспортов. Вариантов не так много:

  • сделать два массива, один для событий, другой для транспортов, а соответствующие значения записывать с одним и тем-же индексом

  • сделать один массив, в котором пары событие/транспорт будут строками с разделителем

Первый вариант был отклонён из-за более сложной реализации разбора на стороне Greenplum. Второй вариант выглядит, для нашего примера, так:

{
  "uID" : 1000358546,
  "events" : [ 
    "FEEDBACK|PUSH", "FEEDBACK|SMS",
    "MARKETING|PUSH", "MARKETING|EMAIL",
    "ORDER_STATUS|SOC_VK"
  ]
}

Чтобы выполнить такую Jolt-трансформацию понадобится цепочка преобразований. Для начала, нужно развернуть массив транспортов и желательно вывести тип событий на один уровень с транспортом. Затем склеить строки и убрать лишние поля.

Первый этап
[{
     "operation": "shift",
     "spec": {
       "nET": {
         "*": {
           "transports": {
             "*": {
               "*": {
                 "@1": "outer[&4].inner[&2].t",
                 "@(3,eventTypeCode)": "outer[&4].inner[&2].etc"
               }
             }
           }
         }
       },
       "*": "&"
     }
}]

Здесь мы для каждого транспорта создаём объект, который вложен в два массива - внешний и внутренний (относительно "transports"). Значение транспорта записываем в поле t этого объекта, потом добавляем в этот объект поле etc, которое будет иметь значение из eventTypeCode.

Результат выполнения:

{
  "uID" : 1000358546,
  "outer" : [ {
    "inner" : [ {
      "t" : "PUSH",
      "etc" : "FEEDBACK"
    }, {
      "t" : "SMS",
      "etc" : "FEEDBACK"
    } ]
  }, {
    "inner" : [ {
      "t" : "PUSH",
      "etc" : "MARKETING"
    }, {
      "t" : "EMAIL",
      "etc" : "MARKETING"
    } ]
  }, {
    "inner" : [ {
      "t" : "SOC_VK",
      "etc" : "ORDER_STATUS"
    } ]
  } ]
}
Второй этап

Итак, у нас есть пары, но, прежде чем склеивать, давайте упростим массив:

{
   "operation": "shift",
   "spec": {
     "outer": {
       "*": {
         "inner": {
           "*": "events[]"
         }
       }
     },
     "*": "&"
   }
  }

Добавив эту спецификацию в список Jolt-спецификаций получим результат:

{
  "uID" : 1000358546,
  "events" : [ {
    "t" : "PUSH",
    "etc" : "FEEDBACK"
  }, {
    "t" : "SMS",
    "etc" : "FEEDBACK"
  }, {
    "t" : "PUSH",
    "etc" : "MARKETING"
  }, {
    "t" : "EMAIL",
    "etc" : "MARKETING"
  }, {
    "t" : "SOC_VK",
    "etc" : "ORDER_STATUS"
  } ]
}
Третий этап

Теперь наш массив не выглядит так страшно, как после первого этапа. Теперь мы можем легко склеить пары значений в одну строку.

 {
     "operation": "modify-default-beta",
     "spec": {
       "events": {
         "*": {
           "transport": "=concat(@(1,etc),'|',@(1,t))"
         }
       }
     }
  }

Добавив этот этап к нашему списку спецификаций получим:

{
  "uID" : 1000358546,
  "events" : [ {
    "t" : "PUSH",
    "etc" : "FEEDBACK",
    "transport" : "FEEDBACK|PUSH"
  }, {
    "t" : "SMS",
    "etc" : "FEEDBACK",
    "transport" : "FEEDBACK|SMS"
  }, {
    "t" : "PUSH",
    "etc" : "MARKETING",
    "transport" : "MARKETING|PUSH"
  }, {
    "t" : "EMAIL",
    "etc" : "MARKETING",
    "transport" : "MARKETING|EMAIL"
  }, {
    "t" : "SOC_VK",
    "etc" : "ORDER_STATUS",
    "transport" : "ORDER_STATUS|SOC_VK"
  } ]
}
Последний этап

Теперь нам остаётся только преобразовать массив объектов в массив строк, взяв только значения поля transport.

{
     "operation": "shift",
     "spec": {
       "events": {
         "*": {
           "@transport": "events[]"
         }
       },
       "*": "&"
     }
  }

Добавление этой операции приведёт нас к желаемому результату:

{
  "uID" : 1000358546,
  "events" : [ 
  	"FEEDBACK|PUSH", "FEEDBACK|SMS",
    "MARKETING|PUSH", "MARKETING|EMAIL",
    "ORDER_STATUS|SOC_VK"
  ]
}

Итак, цепочка спецификаций выглядит так:

 [
 	{
     "operation": "shift",
     "spec": {
       "events": {
         "*": {
           "transports": {
             "*": {
               "*": {
                 "@1": "outer[&4].inner[&2].t",
                 "@(3,eventTypeCode)": "outer[&4].inner[&2].etc"
               }
             }
           }
         }
       },
       "*": "&"
     }
  }, {
     "operation": "shift",
     "spec": {
       "outer": {
         "*": {
           "inner": {
             "*": "events[]"
           }
         }
       },
       "*": "&"
     }
  },
   {
     "operation": "modify-default-beta",
     "spec": {
       "events": {
         "*": {
           "transport": "=concat(@(1,etc),'|',@(1,t))"
         }
       }
     }
  },
   {
     "operation": "shift",
     "spec": {
       "events": {
         "*": {
           "@transport": "events[]"
         }
       },
       "*": "&"
     }
  }
]

Результат наших преобразований будет иметь простую схему при конвертации в avro и не будет изменяться от пользователя к пользователю. Строка легко разбивается в запросе к внешней pxf-таблице в Greenplum.

Коллеги, вопросы, предложения, комментарии...

Комментарии (3)


  1. timda
    02.11.2021 11:55

    Отличный рабочий материал.


  1. Andrey-sch
    02.11.2021 13:50

    Полезная статья - спасибо


  1. MaksMuhin
    04.11.2021 22:36

    Отличная статья.