Автоматизация вокруг интернета вещей, использующая информацию телеметрии для управления процессами, умноженная на распределенную микросервисную архитектуру управляющих систем, усложняет диагностику процессов и их выполнения на различных этапах. Сейчас в целом для повышения прозрачности мониторинга распределенных систем развивается технологическая платформа для Observability, включающая в себя сбор логов микросервисов (Logging), извлечение операционных метрик по использованию ресурсов (Metrics) и выполнение замеров времени обработки и отслеживание цепочки обработки (Tracing). Для трассировки систем, использующих REST или gRPC запросы есть множество решений, которые нередко интегрированы в инструменты API Gateway (например в Tyk API Gateway, Kong, Krakend и другие), и для них даже есть инструменты автоматической генерации (например, tracetest, который был рассмотрен в этой статье). Но IoT-системы основаны на передаче сообщений и, во многих случаях, создаются вокруг брокеров очередей сообщений (часто над протоколом передачи телеметрии MQTT) и трассировка должна быть реализована иначе. В этой статье мы создадим простое приложений на основе MQTT с использованием брокера сообщений EMQX и настроим наблюдение через встроенные механизмы трассировки.
EMQX является специализированным MQTT-брокером, который может подписываться на поток событий из внешних источником и может использоваться для координации запуска отдельных микросервисов для реализации логика процесса реакции на данные телеметрии. EMQX может быть запущен в режиме кластера для отказоустойчивости, а также поддерживает установку расширений (plugins), создаваемых на Erlang. Также могут быть настроены шлюзы для взаимодействия с другими протоколами: CoAP, ExProto, LwM2M, MQTT-SN и STOMP или в произвольный адрес через WebHook.
Начнем с установки брокера, он может быть запущен из пакетов/исполняемых файлов для Ubuntu/CentOS/RHEL/MacOS/Windows, через Kubernetes Operator, или через Docker:
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.26
Для тестирования MQTT-брокера можно использовать инструмент командной строки mqttx-cli:
docker run -itd --net host --name subscriber emqx/mqttx-cli sub -t room/temperature
docker run -it --net host emqx/mqttx-cli pub -t room/temperature -m "23.2"
docker logs -f subscriber
Также доступен веб-интерфейс для наблюдения за ресурсами по адресу http://localhost:18083
(при первом входе используется логин admin и пароль public).
Для доступа из JVM-приложения также может использоваться клиентская библиотека (например, от HiveMQ):
implementation("com.hivemq:hivemq-mqtt-client:1.3.0")
Создадим три микросервиса, которые будут взаимодействовать через EMQX в соответствии со следующем схемой информационных потоков:
Запускать все компоненты системы будем на одной машине . Создадим реализацию микросервиса логирования, для этого добавим необходимые зависимости в build.gradle.kts
:
repositories {
mavenCentral()
maven(url = "https://jitpack.io")
}
dependencies {
implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client:develop-SNAPSHOT")
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-websocket:develop-SNAPSHOT"))
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-proxy:develop-SNAPSHOT"))
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-epoll:develop-SNAPSHOT"))
implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-reactor:develop-SNAPSHOT")
implementation("org.slf4j:slf4j-simple:2.0.3")
implementation("io.github.oshai:kotlin-logging-jvm:4.0.0-beta-29")
testImplementation(kotlin("test"))
}
Реализуем подписку на топик room/events
:
fun main(args: Array<String>) {
val log = KotlinLogging.logger("console")
val client = Mqtt5Client.builder()
.serverHost("localhost")
.addConnectedListener {
log.info { "Logger microservice connected" }
}.addDisconnectedListener {
log.info { "Logger microservice disconnected" }
}.identifier(UUID.randomUUID().toString()).buildBlocking()
client.connect()
client.toAsync().subscribeWith().topicFilter("room/events").qos(MqttQos.EXACTLY_ONCE).callback {
val value = String(it.payloadAsBytes)
log.info { "Message: $value" }
}.send()
}
Микросервис Analyzer
будет также использовать значение температуры и отправлять управляющую команду в топики room/events
и control
.
fun main(args: Array<String>) {
var lastState = ""
val log = KotlinLogging.logger("Analyzer")
val client = Mqtt5Client.builder()
.serverHost("localhost")
.addConnectedListener {
log.info { "Analyzer microservice connected" }
}.addDisconnectedListener {
log.info { "Analyzer microservice disconnected" }
}.identifier(UUID.randomUUID().toString()).buildBlocking()
client.connect()
client.toAsync().subscribeWith().topicFilter("room/temperature").qos(MqttQos.EXACTLY_ONCE).callback {
val temperature = String(it.payloadAsBytes).toDoubleOrNull()
log.info { "Temperature = $temperature" }
val event = when {
temperature!=null && temperature<23 -> "heat"
temperature!=null && temperature>=26 -> "cool"
else -> null
}
if (event!=lastState) {
event?.let {
lastState = event
log.info { "Send event $event" }
client.toAsync().publishWith().topic("room/events").qos(MqttQos.AT_LEAST_ONCE).payload(it.toByteArray()).send()
client.toAsync().publishWith().topic("control").qos(MqttQos.AT_LEAST_ONCE).payload(it.toByteArray()).send()
}
}
}.send()
}
Микросервис Control
будет выглядеть аналогично Analyzer
, с отправкой всех полученных сообщений в очередь управления для кондиционера. Добавим дополнительную задержку перед пересылкой сообщения, для этого используем библиотеку поддержки корутин для Kotlin-проекта.
fun main(args: Array<String>) {
val log = KotlinLogging.logger("Control")
val client = Mqtt5Client.builder()
.serverHost("localhost")
.addConnectedListener {
log.info { "Control microservice connected" }
}.addDisconnectedListener {
log.info { "Control microservice disconnected" }
}.identifier(UUID.randomUUID().toString()).buildBlocking()
client.connect()
client.toAsync().subscribeWith().topicFilter("control").qos(MqttQos.EXACTLY_ONCE).callback {
log.info { "Get command: ${String(it.payloadAsBytes)}"}
runBlocking {
delay(2000)
log.info { "Send to conditioner" }
client.toAsync().publishWith().topic("control/conditioner").qos(MqttQos.AT_LEAST_ONCE).payload(it.payloadAsBytes).send()
}
}.send()}
Теперь перейдем к настройке мониторинга. Прежде всего можно добавить мониторинг на зарегистрированные топики (появятся после запуска наших сервисов) на странице http://localhost:18083/#/subscriptions/topics.
Дальше можно зарегистрировать Trace Log по topic или ClientId. К каждому подключенному приложению присоединяется уникальный идентификатор, их можно посмотреть на странице http://localhost:18083/#/clients. Например, идентификатор клиента, который подписан на topic control (микросервис Control) может быть da03906b-66ef-4a11-a978-7592d97bf676
. Создадим трассировку на странице http://localhost:18083/#/log-trace. Это позволит отследить момент времени, когда сообщение было добавлено в topic или извлечено из него (при создании лога по ClientId), например:
2023-06-17T20:37:17.871989+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=control, PacketId=2, Payload=heat)
2023-06-17T20:37:17.876491+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_received, packet: PUBACK(Q0, R0, D0, PacketId=2, ReasonCode=0)
2023-06-17T20:37:19.883670+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_received, packet: PUBLISH(Q1, R0, D0, Topic=, PacketId=1, Payload=heat)
2023-06-17T20:37:19.883863+00:00 [PUBLISH] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: publish_to, topic: control/conditioner, payload: heat
Первые два сообщения связаны с извлечением сообщения из топика, следующие два - отправка пакета и публикация сообщения в topic (control/conditioner
). Если добавить такие обработчики к каждому Client и сделать дополнительную обработку сохраненных логов трассировки можно было отсле:дить время обработки на каждом шаге обработки. Но здесь нет никакого идентификатора, по которому можно было бы отследить порядок обработки конкретного сообщения. Самое простое решение - можно кодировать сообщение в JSON и добавить в него дополнительный токен на первом этапе обработки телеметрии и передается между последующими сообщениями. Для этого добавим поддержку kotlinx.serialization
(плагин для gradle и implementation) и будем кодирование сообщение с токеном трассировки (в Analyzer):
@Serializable
data class TraceableMessage(val traceId:String, val message:String)
//
val token = UUID.randomUUID().toString()
val message = Json.encodeToString(TraceableMessage(traceId=token, message=String(it.toByteArray())))
client.toAsync().publishWith().topic("room/events").qos(MqttQos.AT_LEAST_ONCE).payload(message.toString().toByteArray()).send()
Теперь во всей цепочке сообщений будет доступен уникальный токен для отслеживания обработки одного замера температуры.
2023-06-17T21:13:31.874551+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=control, PacketId=2, Payload={"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"})
2023-06-17T21:13:31.881634+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_received, packet: PUBACK(Q0, R0, D0, PacketId=2, ReasonCode=0)
2023-06-17T21:13:33.887715+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_received, packet: PUBLISH(Q1, R0, D0, Topic=, PacketId=1, Payload={"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"})
2023-06-17T21:13:33.887924+00:00 [PUBLISH] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: publish_to, topic: control/conditioner, payload: {"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"}
Файлы логов могут извлекаться и анализироваться автоматически и создаваться trace-последовательности в формате OpenTracing (сейчас входит в OpenTelemetry). Например можно использовать Java-библиотеку, которые объединяют все запросы одной последовательности в span и создает subspans для каждого этапа обработки на основе анализа Trace Log. Пример такой реализации может быть найден в github-репозитории (там же размещены исходные тексты рассмотренных выше микросервисов). Также отслеживание может быть реализовано в виде плагина (за основу может быть взять шаблон) и установлено в EMQX.
Таким образом, использование встроенных возможностей трассировки в брокере сообщений с добавлением токена отслеживания помогает в диагностике потенциальных проблем в распределенных системах.
В заключение рекомендую открытый урок, посвященный декомпозиции системы на микросервисы по бизнес-аспектам и Event Storming. О чем пойдет речь на этом занятии:
Декомпозиция по бизнес-аспектам;
Определение системных операций;
Разбиение операций на сервисы;
Разбиение по системным аспектам и DDD подход;
Методология PARL и Event Storming;
Записаться на урок можно по ссылке.