Автоматизация вокруг интернета вещей, использующая информацию телеметрии для управления процессами, умноженная на распределенную микросервисную архитектуру управляющих систем, усложняет диагностику процессов и их выполнения на различных этапах. Сейчас в целом для повышения прозрачности мониторинга распределенных систем развивается технологическая платформа для Observability, включающая в себя сбор логов микросервисов (Logging), извлечение операционных метрик по использованию ресурсов (Metrics) и выполнение замеров времени обработки и отслеживание цепочки обработки (Tracing). Для трассировки систем, использующих REST или gRPC запросы есть множество решений, которые нередко интегрированы в инструменты API Gateway (например в Tyk API Gateway, Kong, Krakend и другие), и для них даже есть инструменты автоматической генерации (например, tracetest, который был рассмотрен в этой статье). Но IoT-системы основаны на передаче сообщений и, во многих случаях, создаются вокруг брокеров очередей сообщений (часто над протоколом передачи телеметрии MQTT) и трассировка должна быть реализована иначе. В этой статье мы создадим простое приложений на основе MQTT с использованием брокера сообщений EMQX и настроим наблюдение через встроенные механизмы трассировки.

EMQX является специализированным MQTT-брокером, который может подписываться на поток событий из внешних источником и может использоваться для координации запуска отдельных микросервисов для реализации логика процесса реакции на данные телеметрии. EMQX может быть запущен в режиме кластера для отказоустойчивости, а также поддерживает установку расширений (plugins), создаваемых на Erlang. Также могут быть настроены шлюзы для взаимодействия с другими протоколами: CoAP, ExProto, LwM2M, MQTT-SN и STOMP или в произвольный адрес через WebHook.

Начнем с установки брокера, он может быть запущен из пакетов/исполняемых файлов для Ubuntu/CentOS/RHEL/MacOS/Windows, через Kubernetes Operator, или через Docker:

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.26

Для тестирования MQTT-брокера можно использовать инструмент командной строки mqttx-cli:

docker run -itd --net host --name subscriber emqx/mqttx-cli sub -t room/temperature
docker run -it --net host emqx/mqttx-cli pub -t room/temperature -m "23.2"
docker logs -f subscriber

Также доступен веб-интерфейс для наблюдения за ресурсами по адресу http://localhost:18083 (при первом входе используется логин admin и пароль public).

Для доступа из JVM-приложения также может использоваться клиентская библиотека (например, от HiveMQ):

implementation("com.hivemq:hivemq-mqtt-client:1.3.0")

Создадим три микросервиса, которые будут взаимодействовать через EMQX в соответствии со следующем схемой информационных потоков:

MQTT(3).png
MQTT(3).png

Запускать все компоненты системы будем на одной машине . Создадим реализацию микросервиса логирования, для этого добавим необходимые зависимости в build.gradle.kts:

repositories {
    mavenCentral()
    maven(url = "https://jitpack.io")
}

dependencies {
    implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client:develop-SNAPSHOT")

    implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-websocket:develop-SNAPSHOT"))
    implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-proxy:develop-SNAPSHOT"))
    implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-epoll:develop-SNAPSHOT"))
    implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-reactor:develop-SNAPSHOT")

    implementation("org.slf4j:slf4j-simple:2.0.3")
    implementation("io.github.oshai:kotlin-logging-jvm:4.0.0-beta-29")

    testImplementation(kotlin("test"))
}

Реализуем подписку на топик room/events:

fun main(args: Array<String>) {
    val log = KotlinLogging.logger("console")
    val client = Mqtt5Client.builder()
        .serverHost("localhost")
        .addConnectedListener {
            log.info { "Logger microservice connected" }
        }.addDisconnectedListener {
            log.info { "Logger microservice disconnected" }
        }.identifier(UUID.randomUUID().toString()).buildBlocking()
    client.connect()
    client.toAsync().subscribeWith().topicFilter("room/events").qos(MqttQos.EXACTLY_ONCE).callback {
        val value = String(it.payloadAsBytes)
        log.info { "Message: $value" }
    }.send()
}

Микросервис Analyzer будет также использовать значение температуры и отправлять управляющую команду в топики room/events и control.

fun main(args: Array<String>) {
    var lastState = ""
    val log = KotlinLogging.logger("Analyzer")
    val client = Mqtt5Client.builder()
        .serverHost("localhost")
        .addConnectedListener {
            log.info { "Analyzer microservice connected" }
        }.addDisconnectedListener {
            log.info { "Analyzer microservice disconnected" }
        }.identifier(UUID.randomUUID().toString()).buildBlocking()
    client.connect()
    client.toAsync().subscribeWith().topicFilter("room/temperature").qos(MqttQos.EXACTLY_ONCE).callback {
        val temperature = String(it.payloadAsBytes).toDoubleOrNull()
        log.info { "Temperature = $temperature" }
        val event = when {
            temperature!=null && temperature<23 -> "heat"
            temperature!=null && temperature>=26 -> "cool"
            else -> null
        }
        if (event!=lastState) {
            event?.let {
                lastState = event
                log.info { "Send event $event" }
                client.toAsync().publishWith().topic("room/events").qos(MqttQos.AT_LEAST_ONCE).payload(it.toByteArray()).send()
                client.toAsync().publishWith().topic("control").qos(MqttQos.AT_LEAST_ONCE).payload(it.toByteArray()).send()
            }
        }
    }.send()
}

Микросервис Control будет выглядеть аналогично Analyzer, с отправкой всех полученных сообщений в очередь управления для кондиционера. Добавим дополнительную задержку перед пересылкой сообщения, для этого используем библиотеку поддержки корутин для Kotlin-проекта.

fun main(args: Array<String>) {
    val log = KotlinLogging.logger("Control")
    val client = Mqtt5Client.builder()
        .serverHost("localhost")
        .addConnectedListener {
            log.info { "Control microservice connected" }
        }.addDisconnectedListener {
            log.info { "Control microservice disconnected" }
        }.identifier(UUID.randomUUID().toString()).buildBlocking()
    client.connect()
    client.toAsync().subscribeWith().topicFilter("control").qos(MqttQos.EXACTLY_ONCE).callback {
        log.info { "Get command: ${String(it.payloadAsBytes)}"}
        runBlocking {
            delay(2000)
            log.info { "Send to conditioner" }
            client.toAsync().publishWith().topic("control/conditioner").qos(MqttQos.AT_LEAST_ONCE).payload(it.payloadAsBytes).send()
        }
    }.send()}

Теперь перейдем к настройке мониторинга. Прежде всего можно добавить мониторинг на зарегистрированные топики (появятся после запуска наших сервисов) на странице http://localhost:18083/#/subscriptions/topics.

изображение.png
изображение.png

Дальше можно зарегистрировать Trace Log по topic или ClientId. К каждому подключенному приложению присоединяется уникальный идентификатор, их можно посмотреть на странице http://localhost:18083/#/clients. Например, идентификатор клиента, который подписан на topic control (микросервис Control) может быть da03906b-66ef-4a11-a978-7592d97bf676. Создадим трассировку на странице http://localhost:18083/#/log-trace. Это позволит отследить момент времени, когда сообщение было добавлено в topic или извлечено из него (при создании лога по ClientId), например:

2023-06-17T20:37:17.871989+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=control, PacketId=2, Payload=heat)
2023-06-17T20:37:17.876491+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_received, packet: PUBACK(Q0, R0, D0, PacketId=2, ReasonCode=0)
2023-06-17T20:37:19.883670+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_received, packet: PUBLISH(Q1, R0, D0, Topic=, PacketId=1, Payload=heat)
2023-06-17T20:37:19.883863+00:00 [PUBLISH] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: publish_to, topic: control/conditioner, payload: heat

Первые два сообщения связаны с извлечением сообщения из топика, следующие два - отправка пакета и публикация сообщения в topic (control/conditioner). Если добавить такие обработчики к каждому Client и сделать дополнительную обработку сохраненных логов трассировки можно было отсле:дить время обработки на каждом шаге обработки. Но здесь нет никакого идентификатора, по которому можно было бы отследить порядок обработки конкретного сообщения. Самое простое решение - можно кодировать сообщение в JSON и добавить в него дополнительный токен на первом этапе обработки телеметрии и передается между последующими сообщениями. Для этого добавим поддержку kotlinx.serialization (плагин для gradle и implementation) и будем кодирование сообщение с токеном трассировки (в Analyzer):

@Serializable
data class TraceableMessage(val traceId:String, val message:String)

//
  val token = UUID.randomUUID().toString()
  val message = Json.encodeToString(TraceableMessage(traceId=token, message=String(it.toByteArray())))
  client.toAsync().publishWith().topic("room/events").qos(MqttQos.AT_LEAST_ONCE).payload(message.toString().toByteArray()).send()

Теперь во всей цепочке сообщений будет доступен уникальный токен для отслеживания обработки одного замера температуры.

2023-06-17T21:13:31.874551+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=control, PacketId=2, Payload={"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"})
2023-06-17T21:13:31.881634+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_received, packet: PUBACK(Q0, R0, D0, PacketId=2, ReasonCode=0)
2023-06-17T21:13:33.887715+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_received, packet: PUBLISH(Q1, R0, D0, Topic=, PacketId=1, Payload={"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"})
2023-06-17T21:13:33.887924+00:00 [PUBLISH] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: publish_to, topic: control/conditioner, payload: {"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"}

Файлы логов могут извлекаться и анализироваться автоматически и создаваться trace-последовательности в формате OpenTracing (сейчас входит в OpenTelemetry). Например можно использовать Java-библиотеку, которые объединяют все запросы одной последовательности в span и создает subspans для каждого этапа обработки на основе анализа Trace Log. Пример такой реализации может быть найден в github-репозитории (там же размещены исходные тексты рассмотренных выше микросервисов). Также отслеживание может быть реализовано в виде плагина (за основу может быть взять шаблон) и установлено в EMQX.

Таким образом, использование встроенных возможностей трассировки в брокере сообщений с добавлением токена отслеживания помогает в диагностике потенциальных проблем в распределенных системах.

В заключение рекомендую открытый урок, посвященный декомпозиции системы на микросервисы по бизнес-аспектам и Event Storming. О чем пойдет речь на этом занятии:

  • Декомпозиция по бизнес-аспектам;

  • Определение системных операций;

  • Разбиение операций на сервисы;

  • Разбиение по системным аспектам и DDD подход;

  • Методология PARL и Event Storming;

Записаться на урок можно по ссылке.

Комментарии (0)