Традиционно для сценариев поточной обработки с использованием Map-Reduce рассматриваются такие решения как Hadoop/Spark, либо используются конвейерные системы (например Kafka), для которых есть возможность реализовать концепцию потоков (streams) с помощью дополнительных инструментов (в случае с Kafka это Kafka Connect (для подключения к источникам и получателям потока) и Kafka Streams для реализации Map-Reduce на потоке сообщений.

Начиная с версии 3.9 RabbitMQ анонсировал поддержку нового типа очереди, оптимизированного для поточной обработки. В этой статье мы посмотрим на основные отличия очередей RabbitMQ от классического режима очереди сообщений, а также возможные сценарии использования (с примерами кода на Go).

Обычно RabbitMQ используются в качестве медиатора при создании распределенной системы, основанной на обмене сообщениями, в том числе для сервисно-ориентированной архитектуры (SOA), в основе которой лежит сервисная шина предприятия (Enterprise Service Bus). Кроме того, брокер часто используется как промежуточное хранилище для дальнейшего распределения заданий между исполнителями, связанными с одной или несколькими очередями (это особенно важно в ситуациях, когда время для выполнения задания превышает среднее время между запросами). Брокер сообщений также предоставляет механизмы контроля отправки (уведомления об ошибках, поддержка передачи последовательности сообщений в рамках транзакции, получение квитанций о завершении обработки сообщений или ошибке), а также обеспечивает возможность создания дополнительных очередей для пересылки результата в ответ на запрос, что активно используется в реализации модели Remote Procedure Call (RPC), например в Python Celery. Сообщения в системе не сохраняются и после обработки безвозвратно уничтожаются.

Модель Stream для очереди в RabbitMQ 3.9 позволяет создать иную модель хранения данных, которая напоминает Kafka. В этом сценарии очередь реализуется как "журнал только для добавления данных", который сохраняет предыдущие сообщения и позволяет выполнить агрегацию данных за длительный период, а также "перемотать" положение на более раннюю позицию. При этом для доступа к сообщениям может использоваться как AMQP-совместимые клиенты, так и специальный двоичный протокол и клиентские библиотеки (Go и Java). В любом случае сохраняются все достоинства RabbitMQ при работе в режиме высокой доступности (очереди реплицируются между брокерами в HA-кластере и сохраняются в Mnesia, поэтому могут быть восстановлены при перезагрузке брокеров). Также для отправки сообщений в поток и чтения из потока можно использовать любые поддерживаемые протоколы RabbitMQ (AMQP, STOMP, MQTT), при этом в RabbitMQ поддерживаются внутренние механизмы маршрутизации между Exchange и Queue, что позволяет гибко интегрировать потоковые очереди в существующую архитектуру распределенного приложения.

Для использования очереди в RabbitMQ прежде всего необходимо разрешить дополнение:

rabbitmq-plugins enable rabbitmq_stream

По умолчанию для доступа к потоковым очередям используется порт 5552, при этом в конфигурации можно задать другой порт или выполнить конфигурацию предварительной загрузки (prefetch) на узлы-потребители, а также настроить управление очередью (приостановка отправки при достижении количества непрочитанных сообщений). Нужно отметить, что RabbitMQ работает в push-модели и самостоятельно доставляет информацию о появлении обновлений на клиента, что позволяет реализовать обработку событий практически в реальном времени.

Очереди на основе потоков полезно использовать при большом количестве клиентов. В этом случае все они получат доступ к одной и той же последовательности сообщений, при этом для клиента будет отслеживаться последнее прочитанное сообщение с использованием курсора, сохраненного на сервере и связанного с уникальным идентификатором клиента (в случае автоматической стратегии отслеживания), либо отслеживаемого на стороне клиента (при ручной стратегии). Также для очередей на основе потоков возможно выполнять чтение ранее полученных сообщений, что может быть полезно, например, в задачах поточного анализа информации (например, определение температуры в помещении с усреднением каждый час за последнюю неделю), при этом можно настроить политику удаления (retention policy) для очистки неактуальной части очереди сообщений. Для исключения дублирования сообщений каждое из них получает уникальный идентификатор (целое число или uuid).

Также важным является тот факт, что для классических очередей RabbitMQ для каждой очереди создается отдельный процесс внутри Erlang, в то время как для потоковых очередей вся обработка происходит в пределах одного потока, что позволяет уменьшить расходы при доставке большого количества сообщений одновременно на большое число клиентов. Для оценки производительности потоковых очередей создан специальный инструмент.

Рассмотрим использование поточных очередей для сбора и обработки данных телеметрии, поступающей с внешнего датчика по протоколу MQTT. Поточный режим очереди нам будет необходим для агрегации map-reduce по историческим данным с возможностью одновременного анализа данных в реальном времени (например, для управления кондиционерами).

Для начала установим RabbitMQ (будем использовать официальный docker-контейнер) и добавим поддержку MQTT и Streams.

docker run -it --name rabbitmq -p 5672:5672 -p 1883:1883 -p 5552:5552 rabbitmq:3-management    
rabbitmq-plugins enable rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_stream
rabbitmq-plugins enable rabbitmq_stream_management

Для отправки данных телеметрии создадим пользователя и назначим ему права на vhost

rabbitmqctl add_user mqtt-test mqtt-test
rabbitmqctl set_permissions -p / mqtt-test ".*" ".*" ".*"
rabbitmqctl set_user_tags mqtt-test management

По умолчанию отправка будет происходить в стандартный exchange amq.topic, для пересылки в потоковую очередь необходимо настроить binding между amq.topic и соответствующей очередью. Теперь создадим потоковую очередь:

 rabbitmqadmin declare queue name=measurements queue_type=stream

Для потоковой очереди можно задавать дополнительные аргументы (x-max-length-bytes - количество байт в потоке, x-max-age - максимальное время хранения сообщений, указывается с единицей измерения, например 30D), x-stream-max-segment-size-bytes указывает максимальный размер файла-сегмента для хранения сообщений.

rabbitmqadmin declare binding source=amq.topic destination=measurements routing_key=temperature.#

Для управления высокой надежностью можно управлять репликацией и просматривать состояние потока через rabbitmq-streams:

  • rabbitmq-streams stream_status measurements - просмотр состояния потока (показывает смещение последнего добавленного сообщения)

  • rabbitmq-streams add_replica measurements node2 - добавить реплику потока на узел node2

  • rabbitmq-streams delete_replica measuments node2 - отключить реплику от узла

При использовании клиентского подключения через AMQP при подключении можно указывать (в аргументе stream-offset) с какой позиции будет происходить чтение сообщений (число со смещением от начала потока) или с момента времени (время в ISO или в виде интервала от текущего момента времени), кроме того можно начать с первого доступного сообщения (first), со следующего поступившего сообщения (next). В нашем примере мы будем использовать драйвер на Go, разработанный для использования очередей в режиме потока.

Для отправки данных в реальных условиях используется датчик DHT11 и NodeMCU (прошивка), но для тестирования мы будем отправлять данные вручную и для этого удобно использовать свободно распространяемый MQTT Explorer.

Подключение к RabbitMQ - MQTT
Подключение к RabbitMQ - MQTT
Отправка сообщения с замером температуры
Отправка сообщения с замером температуры

Теперь сделаем приложение для получения средней температуры за последний час. Для подключения будем использовать клиент на Go. Создадим новый проект и добавим зависимость:

go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v1.0.0-rc9

При необходимости получать только сообщения, полученные с последнего запуска можно использовать stream.OffsetSpecification{}.LastConsumed(). Альтернативно можно забрать сообщения с начала (с учетом политики удаления) stream.OffsetSpecification{}.First(), получить последнее (и все новые) сообщения stream.OffsetSpecification{}.Last(), либо указать время для поиска первого сообщения в UNIX timestamp в миллисекундах stream.OffsetSpecification{}.Timestamp().

Получим все сообщения со смещения на 1 час назад от текущего времени:

package main

import (
	"encoding/json"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
	"log"
	"time"
)

type TemperatureData struct {
	Temperature float32 `json:"temperature"`
	Timestamp   int64   `json:"timestamp"`
}

func main() {
  start := time.Now()
  start = start.Add(-1 * time.Hour)		//start timestamp

  //connect to RabbitMQ stream
  consumer, err := env.NewConsumer(
		streamName,
		func(consumerContext stream.ConsumerContext, message *amqp.Message) {
			var pk TemperatureData
			err := json.Unmarshal(message.GetData(), &pk)
			if err != nil {
				panic(err.Error())		//JSON decoding error
			}
			println(pk.Temperature)
		}, stream.NewConsumerOptions().
			SetConsumerName(streamName).
			SetOffset(stream.OffsetSpecification{}.Timestamp(start.UnixMilli())).
			SetCRCCheck(false))
	if err != nil {
		log.Panicln(err.Error())
	}
	defer close(consumer.NotifyClose())
}

Но у такого решения есть одна существенная проблема - его нельзя применять в пакетном режиме, поскольку после извлечения всех сообщений, отправленных после временной метки "на час раньше, чем время запуска", будет продолжена обработка поступающих сообщений в режиме активного прослушивания. Это полезно для приложений обработки в реальном времени (например, здесь можно выполнить управление исполнительным механизмом кондиционера), но в сценарии пакетной обработки необходимо завершить выполнение при завершении очереди. Простого способа получить метку завершения потока API не предоставляет, но есть возможность получить последнее доступное сообщение и завершить обработку при совпадении timestamp сообщения из очереди с ним. Здесь может возникнуть проблема, если за выбранный промежуток времени не было получено ни одного сообщения, но в этом случае может быть добавлена обработка по таймауту (в случае если функция обработки сообщений не будет вызвана ни разу в течении заданного времени).

Если необходимо выбрать события между двумя временными метками, то можно получить сообщение, опубликованное после второй временной метки и остановить обработку на нем или при достижении последнего опубликованного сообщения. Чтобы исключить возможную блокировку потока выполнения при отсутствии сообщений в течении всего промежутка (или после временной метки окончания интервала), можно использовать создание независимых подписок с небольшой временной задержкой перед получением потока сообщений (чтобы последнее сообщение и первое сообщение за временным слотом были уже получены), либо использовать обработку тайм-аута и последовательное создание consumer'ов для подключения к потоку.

Итак финальный вариант кода будет выглядеть следующим образом:

package main

import (
	"encoding/json"
	"fmt"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
	"log"
	"time"
)

type TemperatureData struct {
	Temperature float32 `json:"temperature"`
	Timestamp   int64   `json:"timestamp"`
}

type MeasurementCallback = func([]TemperatureData)

func extractMeasurements(start time.Time, end time.Time, streamName string, callback MeasurementCallback) {
	env, _ := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5552).
			SetUser("measurements").
			SetPassword("measurements"))
	var last *amqp.Message

	var consumerLast *stream.Consumer
	var consumerEnd *stream.Consumer

	var lastMessage *TemperatureData
	var endMessage *TemperatureData

	measurements := make([]TemperatureData, 0, 0)

	consumerLast, _ = env.NewConsumer(streamName, func(consumerContextLast stream.ConsumerContext, message *amqp.Message) {
		last = message
		err := json.Unmarshal(last.GetData(), &lastMessage)
		if err != nil {
			panic(err.Error())
		}
	}, stream.NewConsumerOptions().SetConsumerName(streamName).SetOffset(stream.OffsetSpecification{}.Last()).SetCRCCheck(false))
	defer close(consumerLast.NotifyClose())

	consumerEnd, _ = env.NewConsumer(streamName, func(consumerContextEnd stream.ConsumerContext, message *amqp.Message) {
		err := json.Unmarshal(message.GetData(), &endMessage)
		if err != nil {
			panic(err.Error())
		}
	}, stream.NewConsumerOptions().SetConsumerName(streamName).SetOffset(stream.OffsetSpecification{}.Timestamp(end.UnixMilli())).SetCRCCheck(false))
	defer close(consumerEnd.NotifyClose())

	//sync for first stop-message in next slot (endMessage) and last message in stream (lastMessage)
	time.Sleep(500 * time.Millisecond)

	//connect to stream
	consumer, err := env.NewConsumer(
		streamName,
		func(consumerContext stream.ConsumerContext, message *amqp.Message) {
			var pk TemperatureData
			err := json.Unmarshal(message.GetData(), &pk)
			if err != nil {
				panic(err.Error())
			}
      //message from next slot?
			if endMessage != nil && pk.Timestamp == endMessage.Timestamp {
				consumerContext.Consumer.Close()
				consumerLast.Close()
				consumerEnd.Close()
				callback(measurements)
			}
			measurements = append(measurements, pk)
      //last message in stream?
			if lastMessage != nil && pk.Timestamp == lastMessage.Timestamp {
				consumerContext.Consumer.Close()
				consumerLast.Close()
				consumerEnd.Close()
				callback(measurements)
			}
		}, stream.NewConsumerOptions().
			SetConsumerName(streamName).
			SetOffset(stream.OffsetSpecification{}.Timestamp(start.UnixMilli())).
			SetCRCCheck(false))
	if err != nil {
		log.Panicln(err.Error())
	}
	defer close(consumer.NotifyClose())
}

func main() {
	start := time.Now()
	start = start.Add(-1 * time.Hour)
	end := time.Now()
	finished := make(chan bool)
	extractMeasurements(start, end, "measurements", func(data []TemperatureData) {
		var avg float32 = 0.0
		for _, temp := range data {
			fmt.Printf("Timestamp=%s, Temperature=%f\n", time.Unix(temp.Timestamp, 0), temp.Temperature)
			avg += temp.Temperature
		}
		avg /= float32(len(data))
		fmt.Printf("Average temperature is %f\n", avg)
		finished <- true
	})
	<-finished
}

func close(channelClose stream.ChannelClose) {
	<-channelClose
}

Для изменения срока хранения сообщений необходимо использовать политику, связанную с одной или несколькими очередями. Например, для удаления сообщений старше одной недели, политика может быть задана следующим образом:

rabbitmqctl set_policy --apply-to queues oneweek 'measurements' "{'max-age':'7D'}"

Мы рассмотрели лишь один из возможных сценариев использования режима streams для очередей RabbitMQ. Аналогично можно реализовать другие задачи поточной обработки, где требуется агрегация по историческим данным или возможность ретроспективного анализа (например, при аудите событий безопасности). Конечно же, для многих задач более подходящим решением может быть какая-либо из баз данных для временных рядов (OpenTSDB, Prometheus, InfluxDB), но полезно помнить, что подобный сценарий использования стал возможен и для RabbitMQ в варианте использования поточных очередей.

Исходные тексты проекта размещены на https://github.com/dzolotov/rabbitmq-streams-example

Кстати, сегодня мои коллеги из OTUS проведут бесплатный вебинар на котором будет рассмотрен принцип работы механизмов репликации с точки зрения синхронизации данных. Регистрация доступна по ссылке.

Комментарии (1)


  1. AlexSpaizNet
    28.02.2022 17:29
    +3

    В закладки. Интересно поиграться.

    Мы для эмуляции pub/sub делали эксченжи и присоединяли кьюшки что бы каждая получала копию сообщений. Работает оно, но кьюшки плодятся, мейнтенс...

    Стримы интересно выглядят... надо только раббит обновить.

    Но в конце концов мы в большинстве флоу переходим на кафку. Все таки partial order of messages и авто ассайн воркеров к партишинам для нас маст хэв.