Микросервисная архитектура, предполагающая разделение информационной системы на небольшие самостоятельные части, взаимодействующие посредством сетевых протоколов (чаще всего HTTP с пересылкой JSON-сообщений или через gRPC) имеет особое значение для создания масштабируемых отказоустойчивых приложений. Для реализации микросервисов существует множество библиотек и фреймворков, созданных для всех актуальных технологий разработки (исчерпывающий обновляемый список можно посмотреть например здесь).

Для разработчиков доступны как библиотеки реализации протоколов и веб-серверов, так и полноценные платформы, обеспечивающие не только механизмы обнаружения микросервисов, но и поддержку протоколирования и отслеживания цепочки запросов, контроля доступа, мониторинга доступности и производительности, маршрутизацию запросов и многое другое.

Но особый интерес для разработки микросервисов представляют фреймворки, формирующие среду выполнения (и иногда и инструменты конфигурирования) для быстрого развертывания кода, основанного на модели реакции на внешние и внутренние события. Например, среди известных фреймворков можно назвать Akka (для запуска акторов на JVM), Oracle Helidon (предоставляет большой набор вспомогательных сервисов для мониторинга, контроля доступа и поддержки различных протоколов) и Vert.X (фреймворк для создания реактивных приложений на JVM). Перечисленные фреймворки ориентированы на использование кода, создаваемого на основе JVM-совместимого языка программирования, но аналогичные решения есть и для других технологий разработки. Например, для создания микросервисной архитектуры на Go, может использоваться фреймворк Flogo, основанный на идеях потока сообщений/данных между микросервисами и реакции на события (официальный сайт). В этой статье мы рассмотрим его возможности на примере простой задачи обработки данных телеметрии.

Сам фреймворк представляет из себя набор инструментов командной строки, веб-конструктор для создания action и набор библиотек, которые могут быть подключены к коду на Go для взаимодействия с возможностями фреймворка. Кроме того, что фреймворк обеспечивает механизм последовательных операций (некоторого конвейера), основанный на триггерах, а также в нем “из коробки” поддерживается использование предварительно обученных нейронных сетей на основе TensorFlow для принятия решений о выборе последовательности операций. 

Для конфигурирования фреймворка можно установить веб-интерфейс

docker run -itd -p 3303:3303 flogo/flogo-docker eula-accept

или для запуска наиболее актуальной (нестабильной) версии с поддержкой машинного обучения:

docker run -itd -p 3303:3303 flogo/flogo-docker:unstable-ml eula-accept

Также возможно установить набор инструментов командной строки

go get -u github.com/project-flogo/cli/…

Центральным объектом конфигурирования в Flogo является приложение (Application), оно включает в себя зарегистрированные действия (Actions) и связанные с ними триггеры для запуска действия (Triggers). Начнем с создания нового приложения, для этого выполним переход на страницу http://localhost:3303/ и выберем “New” (создание приложения)

Action может работать с потоком данных (Stream) и формировать на основе набора входных данных (с использованием механизма map-filter-reduce) новый поток данных через канал, который может стать источником событий для другого Action. Второй вид Action — поток выполнения (flow), который может рассматриваться как высокоуровневое описание координации микросервисов со связыванием входным и выходных данных к контексту Action.

Action состоит из последовательности активностей (Activity), которые могут быть установлен из git-репозиториев определенной структуры, которая будет описана ниже.

Определим основные сущности Flogo:

  • модели (models) — описывают структуру входных и выходных параметров activity;

  • триггеры (triggers) — определяют условия запуска Flow;

  • активности (activities) — определяют реализацию шага бизнес-логики.

Большое количество готовых Activity можно найти на странице https://tibcosoftware.github.io/flogo/showcases/ и https://github.com/project-flogo/contrib. Аналогично и models и triggers также могут быть установлены из внешнего источника через веб-интерфейс (Install Contribution) или через командную строку flogo install <url>. Среди встроенных Activity можно выделить следующие:

  • Start a subflow — запустить вспомогательный flow при достижении этой activity;

  • Run an Action — запустить другой Action (flow/stream);

  • Log — добавить сообщение в журнал;

  • Mapper / Filter / Aggregate — реализация обработки данных из входного потока (используется в action типа stream);

  • Throw Error — вернуть ошибку;

  • Return — вернуть результат;

  • SQL Database Activity — действия по манипуляции данными;

  • REST Invoke — запустить внешний REST-сервис.

Для запуска Activity может использовать один из встроенных триггеров или подключаемый из внешнего репозитория. Встроенные триггеры могут вызываться из командной строки (CLI Trigger), при получении HTTP-запроса (Receive HTTP Message), данных через TCP/UDP, а также по таймеру. Также можно обмениваться данными с функциями AWS Lambda.

Триггер связан с последовательностью активностей через input/output аргументы (input передаются из триггера в направлении первой активности, output отправляются в триггер и могут, например, выступить в роли кода ответа или содержания сообщения для HTTP-запроса). 

Activity может получать входящие данные с предыдущего шага (будут доступны через селектор $.name), а также создавать выходные данные для следующего шага. Активность Return создает результат выполнения последовательности активностей (в flow создается единственное значение результата, в stream может быть создан поток из результатов).

Для разработки триггеров, активностей и действий могут использоваться базовые структуры из github.com/project-flogo/core. Для описания взаимодействия с контекстом выполнения определяется список входных и выходных аргументов, а также настройки. Для описания полей могут использоваться как примитивные типы данных (string, int, bool), так и составные структуры и массивы (полный список типов приведен здесь). Точкой входа является функция init(), которая должна зарегистрировать структуру с описанием Activity/Action/Trigger. Также репозиторий должен содержать файл descriptor.json с описанием компонента. Метаданные описывают следующие атрибуты:

  • name — название компонента;

  • type — тип компонента (flogo:activity, flogo:action, flogo:trigger);

  • version — версия компонента;

  • title — название для списка компонентов;

  • description — описание компонента;

  • display.icon — пиктограмма для списка;

  • input — схема входных данных (например, конфигурации или входного потока) для activity;

  • output — схема выходных данных;

  • handler — настройки для trigger.

Мы попробуем разработать Flow для сохранения истории изменения данных с внешнего датчика температуры. Кроме этого мы будем использовать значения температуры для управления внешним исполнителем (кондиционером), который будет реализован в виде подключаемого драйвера на Go, который мы преобразуем в Activity.

Рассмотрим создание следующего сценария обработки потока входных данных с измерениями температуры:

  • данные поступают через HTTP-запросы;

  • производится усреднение окна из 5 последовательных замеров для сглаживания резких выбросов температуры;

  • усредненные замеры отправляются в InfluxDB;

  • выполняется вызов активности для управления кондиционером.

Первая часть обработки реализуется в модели поточной обработки данных. Создаем новое действие типа Stream, назовем его Conditioner Control Flow. Для активации потока добавим триггер для запуска (знак + слева от последовательности активностей) с использованием REST с типом Receive HTTP Message. Перейдем в конфигурацию триггера и определим порт, метод и путь публикации REST-точки подключения.

Для передачи полученных данных добавим в Stream Interface в Inputs именованное поле message и свяжем его в триггере с параметрами http-запроса.

Дальше добавим обработку входных данных и выполним усреднение, для этого добавим активность Aggregate и выполним ее настройку. Будет необходимо указать функцию агрегации (avg), режим определения окна (windowType = sliding), размер окна (количество записей для усреднения) windowSize=5. Далее будет необходимо указать способ преобразования входных данных в данные для агрегации. В нашем случае будет необходимо сделать преобразование в число с плавающей точкой coerce.toFloat64($.message). Flogo представляет большой набор функций для работы со строками, датами, генерации случайных чисел и извлечения данных из JSON, при этом все переменные в текущем контексте доступны через служебный объект $. Последним действием Stream должна быть передача вычисленного значения в канал (Channel), для которого необходимо сконфигурировать входной параметр name (например, “smoothed”) и значение (в данном случае оно будет $.result)

Для обработки усредненных значений температуры необходимо создать новый Action с типом Flow, создать входную переменную value для Action, задать стартовый триггер типа Listen to Channel (конфигурация: название — smoothed, Map to flow inputs — связать переменную value c $.data). 

Для отправки данных в InfluxDB необходимо установить дополнительный компонент github.com/shaliniGovindaNayak/flogo-workspace/activity/influxdb. После добавления компонента необходимо выполнить конфигурацию активности (адрес, порт и данные аутентификации к серверу InfluxDB) и указать название переменной с источником данных (class="formula inline">.value). 

Но наибольший интерес представляет создание активности из существующего кода микросервиса. Для этого мы будем использовать возможности базовой библиотеки Flogo. Начнем с создания файла описания активности.

{
  "name": "actuator",
  "type": "flogo:activity",
  "version": "0.0.1",
  "title": "Actuator",
  "description": "Conditioner Actuator",
  "homepage": "https://github.com/dzolotov/actuator",
  "settings": [],
  ],
  "input": [
    {
      "name": "temperature",
      "type": "integer",
      "required": true
    },
    {
      "name": "state",
      "type": "boolean",
      "required": true
    }
  ],
  "output": [
    {
      "name": "result",
      "type": "string",
      "required": true
    }
  ]
}

Для подключения ядра и дополнительных функций Flogo подключим дополнительные модули. Сначала создадим описание метаданных (metadata.go).

package actuator

import (
    "github.com/project-flogo/core/data/coerce"
    "strconv"
)

type Settings struct {
}

type Input struct {
	Temperature uint64 `md:"temperature,required"`
	State bool `md:"state,required"`
}

func (r *Input) FromMap(values map[string]interface{}) error {
	temperature, _ := coerce.ToInt64(values["temperature"])
            state, _ := coerce.ToBool(values[“state”])
	r.Temperature = temperature
            r.State = state
	return nil
}

func (r *Input) ToMap() map[string]interface{} {
	return map[string]interface{}{
		"temperature": strconv.Itoa(r.Temperature),
                       "state": strconv.FormatBool(r.State)
	}
}

type Output struct {
	Result string `md:"result"`
}

func (o *Output) FromMap(values map[string]interface{}) error {
	strVal, _ := coerce.ToString(values["result"])
	o.Result = strVal
	return nil
}

func (o *Output) ToMap() map[string]interface{} {
	return map[string]interface{}{
		"result": o.Result,
	}
}

и бизнес-логику (activity.go):

import (
	"github.com/project-flogo/core/activity"
)

//Точкой входа в activity является функция init()
func init() {
	_ = activity.Register(&ActuatorActivity{})
}

//определение метаданные (структура входных и выходных параметров и настроек)
var activityMd = activity.ToMetadata(&Settings{}, &Input{}, &Output{})

type ActuatorActivity struct {
}

//получить метаданные
func (a *ActuatorActivity) Metadata() *activity.Metadata {
	return activityMd
}

//логика активности
func (a *ActuatorActivity) Eval(ctx activity.Context) (done bool, err error) {
	//получение входных данных
	input := &Input{}
	err = ctx.GetInputObject(input)
	if err != nil {
		return true, err
	}
	temperature := input.Temperature
	state := input.State
	//реализация бизнес-логики
	//…
	//возврат значения
	output := &Output{Result: temperature}
	err = ctx.SetOutputObject(output)
	if err != nil {
		return true, err
	}
	return true, nil
}

Для установки компонента загрузим репозиторий на github и выполним установку дополнительного компонента с адресом репозитория. После этого в списке активностей появится новая активность Actuator (после добавления нужно будет выполнить связывание или определение констант для входных значений и связывание выходных значений).

Итоговая конфигурация может быть выгружена в Web UI и экспортирована в JSON (Export -> App), либо собрана в выполняемый образ для целевой платформы (Linux/Mac/Windows в WebUI, а также любая поддерживаемая целевая платформа при сборке через CLI. 


Сегодня вечером в OTUS пройдет demo-урок «Тестирование в микросервисной архитектуре». На занятии расскажем про различные типы тестов и инструментов, используемых в тестировании, а также поговорим о том, как микросервисная архитектура изменила подходы к тестированию. Регистрация доступна для всех желающих по ссылке.

Комментарии (1)


  1. gohrytt
    24.03.2022 01:45

    Мне одному кажется что это кринж?