Работа с очередями сообщений — важная часть современных систем обработки данных. В нашей команде мы используем брокер сообщений RabbitMQ, но нам пришлось столкнуться с проблемами при обработке большого объема данных. В поисках решений я начал изучать различные способы оптимизации, и таким образом познакомился с RabbitMQ Streams – плагином, добавляющим log-based потоки, работающие по аналогии с Kafka

Я потратил некоторые время, вникая в принципы работы RabbitMQ Streams с .NET и хочу представить вам краткий обзор, который призван упростить погружение в эту систему


Статья будет разбита по вопросам, ответы на которые я последовательно искал, разбираясь с RabbitMQ Streams. В основном вся информация взята прямиком с официальной документации, где в общем-то всё подробно расписано. Поэтому если возникнет желание углубиться – везде будут ссылки ?

❓ Что это и зачем?

Если вы знакомы с современными брокерами, то скорее всего знаете, что сейчас распространены два разных подхода к обработке сообщений: очередь и поток. Для наглядности в дальнейшем буду ссылаться на ярких представителей каждого из этих подходов: RabbitMQ, собственной персоной, и Apache Kafka соответственно

RabbitMQ

Apache Kafka

Использует очереди - сообщения хранятся в оперативной памяти

Оптимизирован под быстрое «очищение» очереди

Использует потоки - сообщения пишутся на диск

Оптимизирован под большие объемы данных

Каждый из подходов имеет свои слабые и сильные стороны. Так, слабая сторона очередей в RabbitMQ в том, что они плохо справляются с огромными объемами данных:

  • Низкая пропускная способность в сравнении с потоковыми системами

  • Низкая производительность при больших очередях

  • Низкая производительность при большом количестве очередей в «fanout» обменнике (exchange)

Представьте, что сидите вы спокойно на RabbitMQ с 10к сообщний в секунду и вас всё устраивает. Но по одной очереди внезапно начинает приходить 100к сообщений: CPU грузится на 100%, сообщения перестают долетать, дата-центр сгорает дотла и его прах увозят инопланетяне на своем НЛО

Собственно, примерно с этим мы и столкнулись

Что же делать? Внедрять Kafka? К счастью, пересаживаться на другой брокер оказалось необязательно, ведь существует RabbitMQ Streams – плагин, который добавляет в RabbitMQ потоки (streams). Потоки работают примерно так же, как и в Kafka: пишутся прямиком в лог на диске и предоставляются по запросу клиента с любого места

⚠️Небольшой дисклеймер⚠️

Если верить следующему бенчмарку, то RabbitMQ Streams полноценно не заменяет Кафку, зато действительно увеличивает пропускную способность брокера в 3-4 раза: https://www.youtube.com/watch?v=UPkOsXKG4ns


⚙️ Как это работает?

Ссылка на полный материал: https://www.rabbitmq.com/docs/streams#usage

С RabbitMQ Streams можно работать как через отдельный протокол, доступный по порту 5552, так и через привычный AMQP-протокол (по порту 5672), но с некоторыми ограничениями, о которых речь пойдет ниже. Для начала работы достаточно выполнить команду в терминале системы, где установлен брокер. Эта команда включает плагин потоков:

# docker run -it --rm -p 5552:5552 -p 5672:5672 -p 15672:15672 rabbitmq:3-management
rabbitmq-plugins enable rabbitmq_stream
Запуск плагина rabbitmq_stream в терминале
Запуск плагина rabbitmq_stream в терминале

Поток остается AMQP-совместимой очередью (queue). И в принципе вы можете создать его так же, как и обычную очередь, лишь указав атрибут «x-queue-type»

await channel.QueueDeclareAsync(
    "my-stream",
    durable: true,
    exclusive: false,
    autoDelete: false,
    arguments: new Dictionary<string, object> { { "x-queue-type", "stream" } }
);

Небольшие ��юансы кроются в создании подписчиков (consumers), так как во-первых, для них необходимо указать prefetch, а во-вторых, в них необходимо учитывать возможность считывать поток с любой точки

// указываем prefetch
await channel.BasicQosAsync(0, prefetchCount: 100, false);
 
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
    // читаем сообщение
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());

    // получаем позицию сообщения
    var offest = ea.BasicProperties.Headers?["x-stream-offset"];

    await channel.BasicAckAsync(ea.DeliveryTag, false);
};


var arguments = new Dictionary<string, object?>
{
   { "x-stream-offset", "first" } // читаем с самого первого сообщения в потоке
};
await channel.BasicConsumeAsync("my-stream", false, "my-consumer", arguments, consumer);

С точки зрения издателя (producer) абсолютно ничего не меняется. И все возможности роутинга для RabbitMQ полностью сохраняются

// создание fanout обменника
await channel.ExchangeDeclareAsync(exchange: "my-exchange", type: ExchangeType.Fanout);

// бинд очереди за созданным обменником
await channel.QueueBindAsync("my-stream", "my-exchange", "key");

// публикация сообщения в обменник
await channel.BasicPublishAsync(exchange: "my-exchange", routingKey: "key", body: "Hello World!"u8.ToArray());

⚠️Обратите внимание!⚠️

Поведение нескольких подписчиков (consumers), считывающих один поток, будет сильно отличаться от их же поведения, но при чтении обычной очереди. Проще говоря, если у вас есть очередь и у неё есть две подписки, то часть сообщений будет отправляться одному подписчику, а часть – другому. Но в случае с потоками, оба подписчика будут читать абсолютно все сообщения

К сожалению, масштабировать потоки, используя AMQP-протокол довольно проблематично. Сразу возникает несколько вопросов:

  • Что делать, если у сервиса-подписчика будет несколько реплик?

  • Что если поток будет слишком большим для одной машины?

У RabbitMQ Streams готов ответ на каждый из этих вопросов. Но об этом ниже

А пока хотелось бы отметить, что для нераспределенных систем RabbitMQ Streams делает настоящую магию. Благодаря потокам можно в разы увеличить пропускную способность очередей, отделавшись при этом минимальными доработками

?Небольшое примечание?

В статье я использую термины подписчик / издатель вместо consumer / producer, т.к. в русском языке это звучит плохенько ?


?  Как это масштабируется?

Ссылка на полный материал: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams

А также: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet-stream

И ещё это:  https://www.rabbitmq.com/docs/stream-connections

? Партиционирование

Возможно, вы уже представили, какой жуткий костыль вам придется пилить, чтобы раскидать данные по нескольким потокам… Но делать этого не придется, ибо разработчики всё предусмотрели и почти дословно перенесли из Kafka систему с топиками и партициями. Называется она Super Streams

На языке RabbitMQ

На языке Apache Kafka

Super stream

Topic

Stream

Partition

Consumers group by Reference (я не нашел нормального термина)

Group

В AMQP-семантике суперстрим (super stream) – это обменник (exchange) с некоторым количеством очередей-потоков в нём

A super stream is a structure that sits above streams, allowing to logically group a set of streams. AMQP 0.9.1 resources define its physical topology.
Super stream - это структура поверх обычных потоков, группирующая набор потоков

Для того, чтобы распределить потоки между репликами, используется специальная фича – Single Active Consumer (не путать с фичей с аналогичным названием, но для очередей). Благодаря ней, у конкретного потока может быть только один активный подписчик. Остальные подписчики становятся неактивными. Это исключает конкурентную обработку одного потока

Only one instance receives messages in a group of consumers when single active consumer is enabled.
Если включен Single Active Consumer, сообщения в группе потребителей получает только один инстанс

В итоге при запуске нескольких реплик получается следующая картина: по каждому подключению запускается сразу несколько подписчиков на разные потоки одного суперстрима. Но при этом соотношение активных и неактивных подписчиков равномерно распределено по всем подключениям. Ну и соответственно если подписчик на одном подключении активен, то на остальных он будет неактивен

Combining super stream consumers and single active consumer. There is only one active consumer on a partition at a time for a given group.
Сочетание Super Streams и Single Active Consumer. У каждого потока (партиции) только один активный подписчик

Также хочу обратить внимание на то, что по каждому суперстриму может быть несколько групп подписчиков. В RabbitMQ Streams для каждого подписчика можно указать Reference, по значению которого будет проверяться его принадлежность к той или иной группе. Это крайне важная особенность, ведь без неё не получится продублировать сообщения из суперстрима сразу в несколько сервисов (допустим, у вас есть domain-logic-service, который выполняет какую-то бизнес-логику, и logs-service, который просто собирает логи; и оба сервиса хотят получать все события из потока)

Сама логика маршрутизации сообщения в тот или иной поток суперстрима заложена в клиенте RabbitMQ.Stream.Client. Он отличается от обычного AMQP-клиента тем, что использует совершенно другой протокол (без которого невозможно работать с Single Active Consumer как минимум). Этот клиент умный и умеет вычислять хэш по переданному ему идентификатору сообщения. По этому хэшу он и распределяет сообщения между потоками

Подробнее со спецификацией протокола, используемого для взаимодействия с потоками, вы можете ознакомиться здесь: https://github.com/rabbitmq/rabbitmq-server/blob/v4.2.x/deps/rabbitmq_stream/docs/PROTOCOL.adoc#credit

И раз уж зашла речь про клиент для стримов, то предлагаю уже с ним ознакомиться

// конфигурируем подключение
var cfg = new StreamSystemConfig
{
    Password = "guest",
    UserName = "guest",
    VirtualHost = "/",
    Endpoints = new List<EndPoint>
    {
        new IPEndPoint(IPAddress.Loopback, 5552)
    }
};

var streamSystem = await StreamSystem.Create(cfg);

// создаем суперстрим на 6 партиций (стримов)
await streamSystem.CreateSuperStream(new PartitionsSuperStreamSpec("super-stream", 6));

// создаем подписчика
var superConsumer = await streamSystem.CreateSuperStreamConsumer(new RawSuperStreamConsumerConfig("super-stream")
{
    IsSingleActiveConsumer = true,
    Reference = "ConsumerGroup0",
    MessageHandler = async (stream, _, _, message) =>
        Console.WriteLine(Encoding.UTF8.GetString(message.Data.Contents)),
});

// …

// создаем издателя
var producer = await streamSystem.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig("super-stream")
{
    Routing = (message) => message.Properties.CorrelationId.ToString()
});

// пишем в суперстрим
await producer.Send(
  publishingId: 0,
  new Message(Encoding.UTF8.GetBytes($"Hello, World!"))
    { Properties = new Properties { CorrelationId = "1" } }
);
// что за магический нуль первым аргументом – поговорим чуть ниже ?

Примерно так и осуществляется работа с суперстримами в .NET. Хотелось бы написать какой-то комментарий, но тут больше и нечего добавить – интуитивно понятное API говорит само за себя

Единственное, что хотелось бы отметить, так это то, что всё богатство роутинга в RabbitMQ, к сожалению, исчезает в режиме суперстримов. С этим ничего не сделаешь – остается только фильтровать сообщения на стороне подписчика: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#_filtering

? Кластеризация

В работе с кластером есть некоторые нюансы. Дело в том, что обычно неважно, к какой именно ноде вы коннектитесь. Но в случае с RabbitMQ Streams это не так: клиент должен обладать информацией о всех нодах. Это приводит к некоторым проблемам, если вы используете балансировщик нагрузки. Впрочем решение этой проблемы есть, и ознакомиться с ним вы можете здесь: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#address-resolver

А работает кластеризация для потоков достаточно предсказуемо. При создании нового потока выбирается лидер для него. Остальные ноды будут получать реплики этого потока. Ожидается, что только издатель будет взаимодействовать с потоком-лидером, то бишь писать в него. Подписчики же будут обращаться к одной из реплик для чтения. На этом моменте и возникают проблемы с балансировщиками нагрузки, ведь им неизвестно, где находится лидер, а где – реплика

По каждому потоку выбирается лидер на одной из нод. Остальные ноды будут содержать реплики
По каждому потоку выбирается лидер на одной из нод. Остальные ноды будут содержать реплики

Подразумевается, что одна нода не может содержать все потоки-лидеры: они будут равномерно раскиданы по всему кластеру. И в общем-то это всё, что нужно знать для начала работы

Здесь опять же используются возможности «умного» клиента. Он, как было сказано выше, знает о существовании всех нод. А ещё он умеет запрашивать мета-информацию о кластере

На практике работа с кластером выглядит примерно так

var cfg = new StreamSystemConfig
 {
     Password = "guest",
     UserName = "guest",
     VirtualHost = "/",
     // указываем все доступные ноды
     Endpoints = new List<EndPoint>
     {
         new DnsEndPoint("rabbitmq1", 5552),
         new DnsEndPoint("rabbitmq2", 5552),
         new DnsEndPoint("rabbitmq3", 5552)
     }
 };
 var streamSystem = await StreamSystem.Create(cfg);

 
// создаем поток, применяя механизм выбора лидера
await streamSystem.CreateSuperStream(new PartitionsSuperStreamSpec("super-stream", 3)
 {
     LeaderLocator = LeaderLocator.LeastLeaders,
 });

? Как соблюсти консистентность?

Ссылка на полный материал: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#stream-client-overview

Возможно, вас сильно смутил код издателя в разделе выше, а именно в части понимания предназначенияpublishingId и correlationId. Это действительно важные идентификаторы, ведь их правильный подбор обеспечивает поддержку консистентности внутри агрегата

PublishingId – это строго возрастающий идентификатор события. По-умолчанию, он не используется. Но если вы будете использовать дедублицирующего издателя, то по его значению сервер сможет понять, было ли отправлено новое сообщение в поток, или же это случайный дубликат, который был получен из-за лага в сети

var entity = // … ;
var entityEvent = // … ;
 
// инициализация дедублицирующего производителя
var producer = await DeduplicatingProducer.Create(
    new DeduplicatingProducerConfig(streamSystem, "super-stream", "my-producer")
    {
        SuperStreamConfig = new SuperStreamConfig
        {
            Routing = (message) => message.Properties.CorrelationId.ToString()
        }
    });

// отправка сообщения
await producer.Send(
  publishingId: entityEvent.Id,
  new Message(Encoding.UTF8.GetBytes($"Hello, World!"))
    { Properties = new Properties { CorrelationId = entity.Id } }
);

CorrelationId в свою очередь о��вечает за то, чтобы все события по одному агрегату отправлялись строго в один поток. Это исключит конкурентность, что гарантирует обработку всех событий строго последовательно. Вообще вместо CorrelationId можно использовать любое другое поле – главное явно указать в Routing, какое значение использовать для вычисления хэша

⚠️Обратите внимание!⚠️

Если вы пробовали запускать код, то наверное, обратили внимание, что вы не получаете дубли уже прочитанных событий, как в случае с AMQP-клиентом. Дело в том, что при указании Reference для подписчика сервер автоматически отслеживает прочитанные им сообщения


? Выводы, или что там по MassTransit?

Мем, передающий впечатления от использования MassTransit
Мем, передающий впечатления от использования MassTransit

? Поддержка в MassTransit

В первую очередь, хотелось бы затронуть самую животрепещущую тему: что там по MassTransit? Хорошая новость – он полностью поддерживает RabbitMQ Streams. Плохая – только в контексте AMQP-протокола. То есть со Superstreams поработать не получится по крайней мере на сегодняшний день (пишу из конца 2025 года). Имеет ли смысл использовать RabbitMQ Streams без суперстримов? Для нераспределенных систем – вполне. Если намечается high-load – не думаю

// пример конфигурации стрима в MassTransit
rabbit.ReceiveEndpoint("my-stream", e =>
{
   e.PrefetchCount = 100;
   e.Stream(c =>
   {
      c.FromFirst();
   });
   e.ConfigureConsumer<TransportConsumer>(cfg);
});

? Поддержка комьюнити

Немного разочаровала поддержка комьюнити. По RabbitMQ Streams очень много информации в официальных источниках, но в остальном он гуглится так себе. Есть ощущение, что большая часть разработчиков даже не знает, о его существовании

? Целесообразность использования

Скорее всего RabbitMQ Streams подойдет только для тех, кто уже использует RabbitMQ. Это будет отличное решение, чтобы увеличить пропускную способность, но не устраивать зоопарк из брокеров. В остальных случаях, вероятно, лучше изначально отдать предпочтение Apache Kafka (про NATS ничего не могу сказать). Полагаю, это объясняет низкую популярность в комьюнити, вкупе с отсутствием поддержки суперстримов в MassTransit

? Документация и легкость в использовании

Что порадовало – так это удобный клиент и понятная документация, делающие RabbitMQ Streams крайне легким в освоении. Это опять же идеально накладывается на ситуацию, когда в вашей команде уже используется RabbitMQ, но начали появляться «высоконагруженные» очереди, которые надо как-то оптимизировать. Ваша команда сможет очень быстро внедрить потоки и поправить ситуацию


На этом у меня всё. Надеюсь, изложенный материал оказался полезен или как минимум любопытен тем, кто заинтересован в использовании RabbitMQ Streams

Комментарии (4)


  1. posledam
    11.12.2025 01:43

    Я бы выбрал NATS, стримы из коробки, меньше избыточных сущностей, быстрый и со стороны разработки показался прям сильно удобней, чем Rabbit. Конечно, если много всего написано и работает под Rabbit, наверное плагин будет кстати. Но специально я бы в него не шёл.


  1. Gromilo
    11.12.2025 01:43

    Основная сложность стримов в рэбите: это подтверждение доставки.

    Допустим мы отправляем с помощью await porduser.Send(...), это вообще ничего не значит, нужно ждать когда отработает MessageHandler. Приходится писать свою обвязку.

    Если отправили список сообщений (даже с одним сообщением), то MessageHandler отрабатывает сразу, а если по одному, то происходит буфферизация. Задаётся параметров MessagesBufferSize продюсера, по умолчанию 100. Параметра "максимальное время ожидания отправки" не существует.

    В итоге, из коробки либа Rabbit.Streams подходит для быстрой массовой отправки сообщений без гарантии доставки. Если нужны гарантии, нужно дописывать свой код.


  1. Gromilo
    11.12.2025 01:43

    А ещё нет сжатия сообщений в стримах.

    Всё что есть - сжатие пачки сообщений на продюсере при отправке. Вся эта пачка становится одним чанком. Если отправка идёт по одному сообщению: то никакого сжатия.


  1. KEugene
    11.12.2025 01:43

    Этот чувак на иллюстрации к статье - крутой разработчик. Он шарит в С++, С#, Go, Rust, Python. Это только то, с чем он засветился в последний месяц. Но наверняка его знания намного более обширны.