Как мы встраивали цифровую платформу в инфопотоки предприятия с помощью Apache NIFI

Привет, Хабр! Меня зовут Игнат Нахай и это мой первый пост. Я работаю в команде по внедрению платформы ZIIoT для промышленности. Конкретнее – отвечаю за архитектурные решения при внедрении платформы в информационный слой заказчика.

ZIIoT объединяет все источники промышленных данных на предприятии и через набор MES-сервисов позволяет управлять качеством, отслеживать генеалогию продукции, анализировать производительность, проводить оперативное и детальное планирование, контролировать состояние и распределение ресурсов и много чего еще.  В связи с этим в платформе рождается большой объем информации, которая востребована в других информационных системах предприятия. Здесь я расскажу, как мы решали задачу построения информационных потоков и как нам в этом помог Apache NIFI. 

Условия задачи

Итак, как я уже сказал ранее, на промышленных предприятиях важно наладить передачу данных с платформы ZIIoT в информационные системы разных уровней. Это могут быть системы уровня ERP (Microsoft Dynamics AX, SAP), различные системы документооборота (СЭД «Дело», 1С) и системы уровня MES\LIMS (Wonderware, Labware, Webtrieve). Например, ERP-система может быть заинтересована в данных о наряд-заказах, состоянии оборудования, фактах возникновения неисправностей, генеалогии партий и др., а системе документооборота может понадобиться информация о сроках готовности продукции, сопроводительная документация, которую готовит участок контроля качества, наличие расходных материалов на складах и т. д.

Особенность заключается в том, что наша платформа построена на микросервисной архитектуре и имеет огромное количество «ручек», которые необходимо дернуть, чтобы получить интересующую информацию. Из-за этого задачу интеграции с информационными системами разных производителей сложно назвать простой, особенно когда ее нужно решить с минимальными трудозатратами – в идеале командой внедрения и совсем без привлечения разработчиков. Такая задача перед нами и стояла.

Решение

Общий подход

При решении задач интеграционного обмена существует несколько основных способов интеграции:

  1. Обмен файлами

  2. Обмен через общую шину данных

  3. Взаимодействие через API

Первые два способа реализуют интеграцию на уровне данных, при этом, обмен через шину позволяет избежать недостатков обмена файлами. Мы же пошли по третьему пути. Взаимодействие через API позволяет реализовать интеграцию на уровне функций, и это дает некоторые преимущества:

  1. Согласованность данных: платформа, как система-источник, выполняет предварительную подготовку данных и включает функционал по обеспечению целостности

  2. Скорость получения данных: отсутствуют задержки, связанные с необходимостью записи данных в хранилище-посредник

  3. Возможность двустороннего обмена: наличие функции позволяет не только передавать данные, но и получать

Здесь нужно сказать несколько слов об архитектуре платформы ZIIoT.

Чтобы обеспечить гибкую функциональность платформы в соответствии с потребностями каждого заказчика, она изначально проектировалась как набор приложений. Каждый сервис имеет свою доменную модель и отвечает за ограниченный набор функций и данных. Такая архитектура приносит в систему множество плюсов, начиная от упрощения разработки и заканчивая возможностью горизонтального масштабирования. Дополнительно о микросервисном подходе можно прочитать здесь: https://microservices.io/patterns/microservices.html  

Для удобства пользователя платформа реализована как web-приложение: все вычисления производятся на сервере, а пользовательский доступ осуществляется через браузер. Так не требуется устанавливать никакого дополнительного ПО на компьютеры пользователей.

Эти два подхода подталкивают нас к тому, что в качестве API был выбран REST.

Но вернемся к интеграции.

Безусловно, для legacy систем, которые поддерживают только SQL или файловый обмен, мы выгружаем информацию в файлы или в БД. Но как быть с системами, которые готовы взаимодействовать через REST-API?

Казалось бы, ответ очевиден: микросервисы платформы обладают REST-API, и смежные системы могут напрямую обращаться к ним, чтобы получить всю необходимую информацию. Однако в абсолютном большинстве случаев смежным системам требуется агрегированная информация от нескольких сервисов, и тут наступает расплата за все плюсы микросервисной архитектуры.  Непосредственное обращение к API сервисов ZIIoT приведет к высокой связанности и большим трудозатратам со стороны внешней системы. То есть, если какой-то системе понадобится получить список партий на складе, вместо отправки одного запроса ей придется сначала выполнить запрос к сервису складов и найти идентификатор склада по его названию, потом обратиться к сервису партий и найти все партии по идентификатору склада. Но в партиях будет написан только идентификатор материала, без названия, и придется выполнить еще запрос к сервису материалов, чтобы узнать название материала по идентификатору и т. д. Получается, что вместо обращения к одному API придется обращаться к большому количеству разных API. А чтобы это настроить, нужно глубокое понимание архитектуры платформы. Мы же изначально ставили условие минимизировать трудозатраты, так что это не наш путь.

Кроме того, как быть с системами, которые поддерживают только WEB-Service на основе SOAP?

У этих проблем есть очевидное решение – сделать специальное проксирующее API, которое будет получать запросы от внешней системы, выполнять запросы непосредственно к сервисам и формировать ответ с данными. Идею мы позаимствовали у руководителя департамента разработки SoundCloud Фила Калсадо (Phil Calçado). Он описал ее как паттерн Back-end for Front-end.

Обычно под BFF подразумевают сервис, обладающий меньшим количеством возможностей, чем полноценный API.  С его помощью можно осуществлять:

  1. Обращение к микросервисам продукта и получение от них данных

  2. Форматирование данных, чтобы они передавались в нужном формате

  3. Отправку данных заказчику

Чем же хорош подход BFF? Хотелось бы выделить несколько моментов:

  1. Адаптивность: можно построить API, адаптированное под нужды конкретного потребителя

  2. Агрегация: минимизация количества запросов от внешних систем

  3. Изолированность: нет необходимости выставлять API сервисов наружу. Клиент получает доступ только к тем данным, к которым нужно и можно

С подходом мы определились. Следующий вопрос на повестке – это инструментарий.  

Инструментарий

На поверхности лежит вариант с разработкой отдельного сервиса, который будет выполнять роль BFF. Но в этом случае на каждом проекте под каждую систему пришлось бы разрабатывать отдельный сервис, который, по сути, являлся бы одноразовым. Во-первых, это не соответствует продуктовой стратегии. Во-вторых, непонятно, кто будет поддерживать этот сервис и что делать, если потребности заказчика изменятся. В-третьих, неизвестно, сколько таких сервисов понадобится. И в-четвертых, слишком много трудозатрат и с нашей стороны, и со стороны заказчиков. А мы, напомню, заинтересованы в том, чтобы строить и изменять интерфейсы под нужды проекта силами инженеров внедрения, без привлечения команд разработки, а заказчики – в том, чтобы им не приходилось нанимать разработчиков для доработки и поддержания сервиса.

Если еще раз обратиться к функциям сервиса BFF, то можно заметить, что они очень сильно напоминают функции другого класса сервисов – ETL:

  1. Extract – извлечение данных

  2. Transform – преобразование

  3. Load – загрузка

В состав ZIIoT входит ETL-система Apache NIFI, которая служит для получения данных с оборудования, датчиков и промышленных систем и загрузки эти данных в платформу. Это те же функции, которые требуются для интеграции с другими информационными системами, только в другом направлении. Естественно, мы решили задействовать именно этот инструмент, при реализации подхода BFF. Концептуальная схема реализации подхода BFF у нас представлена на Рисунке 1.

Рисунок 1. Концептуальная схема реализации подхода BFF
Рисунок 1. Концептуальная схема реализации подхода BFF
Пара слов, о том, что такое Apache NIFI

Apache NIFI - это платформа обработки событий (сообщений), позволяющая управлять потоками данных из разных источников в реальном времени с использованием графического интерфейса.

Какие возможности нам дает NIFI:

  1. Низкий порог входа. NIFI – это low-code система, которая позволяет инженеру с минимальными навыками программирования быстро начать работать

  2. Графическое представление пайплайнов обработки данных

  3. Поддержка большого количества протоколов и схем данных

  4. Кастомное логирование под требования заказчика.

Реализация

Apache NIFI предоставляет огромный арсенал инструментов для извлечения, трансформации и загрузки данных. Поэтому будет правильно сказать пару слов о тех процессорах, которые мы используем чаще всего.

Для построения API мы используем процессоры HandleHttpRequest и HandleHttpResponse. Эти процессоры позволяют построить на  NIFI WEB-сервис. При этом для каждой системы-потребителя желательно использовать отдельные процессоры HandleHttpRequest и HandleHttpResponse с отдельными контроллерами HttpContextMap, чтобы интеграции с разными системами не «аффектили» друг на друга.

Для того, чтобы ограничить доступ, процессор HandleHttpRequest позволяет настроить аутентификацию в трех режимах:

  1. No Authentication. Подключение анонимно. Подключаться может любой

  2. Want Authentication. Процессор попытается верифицировать клиента, но если не получится, доступ будет предоставлен анонимно

  3. Need Authentication. Доступ будет запрещен всем клиентам, которые не предоставят подлинный сертификат, который можно проверить в trustStore контроллера StandardRestrictedSSLContextService.

Но поскольку в ZIIoT в качестве сервиса SSO используется keycloak, я предпочитаю, чтобы внешние системы выполняли аутентификацию (получали access_token) там – по стандарту Oauth2, и в дальнейшем при обращении к сервисам ZIIoT из NIFI была возможность использовать этот токен для проверки прав доступа. Чтобы было понятнее, о чем речь, рекомендую к прочтению статью.

  1. Процессор HandleHttpRequest добавляет в атрибуты flowfile все заголовки, полученные в запросе: их можно использовать как для определения формата ответа, так и для авторизации. Кроме того, процессор HandleHttpRequest добавляет во flowfile атрибуты, которые мы используем для определения маршрута обработки файла: http.method, http.request.uri – эти атрибуты позволяют нам однозначно идентифицировать, какую информацию мы должны вернуть.

  2. http.remote.host, http.remote.user, http.principal.name – эти атрибуты используются в дальнейшем для логирования: позволяют однозначно идентифицировать из какой системы и от какого пользователя пришел запрос.

В зависимости от потребностей систем-получателей мы можем возвращать информацию в формате JSON, XML, CSV и др. Для преобразования форматов мы используем процессоры converRecord, convertXMLtoJSON и др.

Если вы работаете с JSON, то я рекомендую познакомится с  JOLT-спецификацией, поскольку она довольна удобна при преобразовании JSON, а в NIFI есть специальный процессор JoltTransformJSON для работы с ней.

Кроме этого, есть ряд удобных процессоров, например QueryRecord, которые позволяют выполнять sql запросы с оператором SELECT к flowfile со структурированным содержимым, будь то JSON или CSV.

Для определения «маршрутов» flowfile чаще всего мы используем процессор RouteOnAttribute.

Поскольку внутри платформы наши сервисы имеют REST API, для получения всей необходимой информации в NIFI применяются процессоры invokeHttp.

На этом предлагаю закончить краткий обзор инструментов. Подробнее с полным перечнем возможностей можно познакомиться в официальной документации NIFI.

А сейчас предлагаю перейти к обзору pipeline.

Путь

Первое, что мы делаем после получения запроса от внешней системы, – определяем http-метод, по которому осуществляется обращение.

Чаще всего мы реализуем обработку двух методов – GET и POST.

Метод POST используется для создания объектов внутри ZIIoT, в том случае, когда интеграция направлена внутрь (т. е. данные из внешней системы сохраняются в платформу).

Метод GET используется для запроса данных из платформы.

После разделения потока по методам необходимо разделить каждый из потоков в соответствии с location (URL). На данном шаге мы однозначно определим, какой набор данных необходимо вернуть системе-заказчику.

Чтобы отфильтровать данные, необходимые для передачи, используются параметры http-запросов, полученные от внешней системы. В зависимости от реализации эти параметры могут быть транслированы в неизменном виде к сервисам-источникам.

Здесь мы подошли к очень важному моменту. HTTP подразумевает механизм «запрос-ответ», и будет неправильно оставить запрос без ответа, даже если он осуществляется по нереализованному URL. Поэтому ни одно отношение не должно быть терминировано.

Все flowfile, которые не подходят по параметрам к тем URL, которые мы обрабатываем, должны направляться к процессору HandleHttpResponse, который будет возвращать код ответа 400 или 404. Что логичнее? Вот ссылка на RFC7231 - можем подискутировать.

Рисунок 2. Пример маршрутизации потоков
Рисунок 2. Пример маршрутизации потоков

Следующим этапом идет получение информации от сервисов платформы, фильтрация и агрегация данных. 

Вне зависимости от той логики, по которой осуществляется получение данных, необходимо предусмотреть процессоры HandleHttpResponse для нескольких кодов ответов:

200 и 201 – возвращаем в случае успеха.

500, 501, 502, 503 – транслируем от сервиса, который его вернул. Решение о повторном запросе (Retry) принимает внешняя система. В некоторых случаях можно предусмотреть в потоке ограниченное количество попыток.

 Суммируя вышесказанное:

  1. Для построения API используем процессоры HandleHttpRequest и HandleHttpResponse. Для каждой системы используем отдельные контроллеры.

  2. На протяжении всего пайплайна обработки flowfile (запроса) исключаем терминирование отношений.

  3. Разделяем коды ответов в зависимости от результата.

Плюсы и минусы

Как и у любого подхода, у вышеописанного есть плюсы и минусы. К минусам можно отнести высокие накладные расходы – реализация распределения информационных потоков на NIFI требует больше аппаратных ресурсов, чем специальный BFF-сервис, затрудненная обработка ошибок. Еще у нас нет возможности автоматически генерировать документацию. Мы не можем использовать swagger, приходится описывать API «ручками».

Однако в качестве компенсации мы получаем:

  1. Гибкость: под каждую систему можно построить индивидуальное API

  2. Доступность: реализация возможна без разработки

  3. Наглядность: визуально можно проследить всю цепочку обработки запроса

Таким образом мы можем создать интерфейс, покрывающий потребности в интеграционном обмене между системами на проекте без необходимости доработки сервисов. Такое решение позволяет нам внести дополнительные проверки и валидацию запросов от внешних информационных систем и, в случае необходимости, изолировать API сервисов от прямых обращений извне, а также оптимизировать количество запросов от внешних систем.

Бонус для тех, кто прочитал до конца :)

Каждый процессор HandleHttpRequest слушает определенный TCP-port, и чтобы не открывать все эти порты наружу, правильно будет «спрятать» их за reverse-proxy, например NGINX. Но это уже совсем другая история...

Комментарии (0)