RabbitMQ – это брокер сообщений, служба, отвечающая за обмен сообщениями между разными программными сервисами.
RabbitMQ держит сообщения в очереди (Queue), которая является именованным буфером, хранящим адресованные ему сообщения.
Программа, посылающая сообщения в очередь RabbitMQ, называется поставщиком (Producer).
Программа, принимающая сообщения, называется подписчиком (Consumer). Такие программы подписываются на события поступления сообщения в очередь, и всегда находятся в ожидании новых сообщений.
Множество поставщиков могут отправлять сообщения в очередь, и множество подписчиков могут считывать сообщения из очереди.
Запуск сервера RabbitMQ
В целях данного туториала, нам необходимо иметь запущенную службу RabbitMQ, и сделать это мы можем двумя способами: развернуть сервер RabbitMQ локально в докер-контейнере, или воспользоваться сторонним облачным сервисом. Ниже рассмотрим оба варианта, и вы можете выбрать для себя наиболее подходящий.
1.1. Способ 1: Запуск сервера RabbitMQ в докер контейнере на своем локальном компьютере
Для этого нам необходимо, чтобы на нашем компьютере предварительно было установлено программное обеспечение Docker Desktop.
- Для этого переходим по ссылке https://www.docker.com/products/docker-desktop, скачиваем установщик и запускаем его.
Важно выбрать опции Enable Hyper-V Windows Features или Install required Windows components for WSL 2, когда установщик об этом спросит.
Следуем инструкциям установщика, и по завершении процесса установки жмем кнопку Close.
Находим Docker Desktop в списке установленных на компьютере программ, и запускаем его.
В командной строке Windows или в PowerShell поочередно запускаем две следующие команды:
docker run -d --hostname my-rabbit-host --name my-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
и
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Окно командной строки не закрываем.
В приложении Docker Desktop, на вкладке Containers / Apps увидим запущенные контейнеры RabbitMQ:
В браузере перейдем по ссылке localhost:15672. В поля имя пользователя и пароль введем guest и guest. Здесь мы можем управлять нашей службой RabbitMQ.
1.2. Способ 2: Регистрация в облачном сервисе CloudAMQP и настройка сервиса RabbitMQ
Переходим по адресу https://www.cloudamqp.com/plans.html.
Выбираем бесплатный план Little Lemur (жмем Get Now).
Регистрируемся на сайте.
Создаем новую сущность (instance) RabbitMQ.
На следующей странице выбираем доступный регион расположения сервера и жмем Review.
На странице Configure жмем Create Instance.
На следующей странице жмем на названии нашей новой сущности:
и попадаем в панель управления нашего экземпляра RabbitMQ:
Использование RabbitMQ в проектах ASP.NET Core
Создадим 2 проекта ASP.NET Core Web API: проект, который будет поставщиком (продюсером) сообщений, и проект, который будет подписчиком (консьюмером).
Для обоих проектов, после их создания на последующих шагах, необходимо выполнить установку NuGet пакета: RabbitMQ.Client.
Для этого в SolutionExplorer (Обозреватель решений) правой кнопкой мыши жмем по названию рабочего проекта и выбираем Manage NuGet Packages… (Управление пакетами Nuget).
Далее переходим на крайнюю левую вкладку Browse, и в строку поиска вводим название устанавливаемого пакета NuGet.
В левом окне выбираем нужный нам пакет, а в правом жмем кнопку Install.
2.1. Создаем проект ASP.NET Core Web API с продюсером.
В Visual Studio создаем новый проект ASP.NET Core Web API:
В новый проект добавляем NuGet пакет RabbitMQ.Client как описано выше.
В корне проекта создаем папку “RabbitMq”, и в этой папке создаем интерфейс:
public interface IRabbitMqService
{
void SendMessage(object obj);
void SendMessage(string message);
}
В этой же папке создаем класс:
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
public class RabbitMqService : IRabbitMqService
{
public void SendMessage(object obj)
{
var message = JsonSerializer.Serialize(obj);
SendMessage(message);
}
public void SendMessage(string message)
{
// Не забудьте вынести значения "localhost" и "MyQueue"
// в файл конфигурации
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "MyQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "MyQueue",
basicProperties: null,
body: body);
}
}
}
В данном примере мы создали продюсер для локального сервера RabbitMQ:
var factory = new ConnectionFactory() { HostName = "localhost" };
Для облачного сервиса RabbitMQ нужно изменить эту строку.
Перейдем в панель управления CloudAMQP, и нажмем значок “скопировать” напротив строки подключения: AMQP URL.
Вставим скопированную строку подключения в наш код:
var factory = new ConnectionFactory() { Uri = new Uri("строка_подключения") };
Тееперь продюсер будет отправлять сообщения в облачный сервис.
Добавим класс RabbitMqService в DI контейнер. Для этого добавим в метод ConfigureServices класса Startup следующую строку:
public void ConfigureServices(IServiceCollection services)
{
// другой код
services.AddScoped<IRabbitMqService, RabbitMqService>();
// другой код
}
В папку Controllers добавим новый API контроллер:
Добавим в него код:
using Microsoft.AspNetCore.Mvc;
using RabbitMqProducer.RabbitMq;
[Route("api/[controller]")]
[ApiController]
public class RabbitMqController : ControllerBase
{
private readonly IRabbitMqService _mqService;
public RabbitMqController(IRabbitMqService mqService)
{
_mqService = mqService;
}
[Route("[action]/{message}")]
[HttpGet]
public IActionResult SendMessage(string message)
{
_mqService.SendMessage(message);
return Ok("Сообщение отправлено");
}
}
Запустим проект, и в сваггере (https://localhost:ваш_порт/swagger/index.html) вызовем метод SendMessage, передав в качестве параметра произвольную строку. Для этого нажмем на строке этого метода, в открывшейся панели вверху справа нажмем кнопку Try it out, и в окне message введем наше сообщение. Нажмем кнопку Execute.
В ответе мы должны получить статус код 200 и сообщение “Сообщение отправлено”.
Если вы используете локальный сервер RabbitMQ, переходим в панель управления локальным экземпляром RabbitMQ (http://localhost:15672).
Если вы используете облачный сервис CloudAMQP, переходим в его панель управления. В верхней части левого меню жмем кнопку RabbitMQ Manager и попадаем в такую же панель управления RabbitMQ, что и для локального экземпляра.
Перейдем на вкладку Queues, в табличке нажмем на наименовании нашей очереди ("MyQueue").
В списке внизу раскроем элемент Get messages, нажмем кнопку Get message(s), и увидим наше сообщение “Привет, Кролик!”:
Продюсер работает.
2.2. Создаем проект ASP.NET Core Web API с консьюмером.
В Visual Studio создаем новый проект ASP.NET Core Web API.
В новый проект добавляем NuGet пакет RabbitMQ.Client как описано выше.
В корне проекта создаем папку “RabbitMq”, и в этой папке создаем класс:
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Threading.Tasks;
using System.Threading;
using Microsoft.Extensions.Hosting;
using System.Text;
using System.Diagnostics;
using System;
public class RabbitMqListener : BackgroundService
{
private IConnection _connection;
private IModel _channel;
public RabbitMqListener()
{
// Не забудьте вынести значения "localhost" и "MyQueue"
// в файл конфигурации
var factory = new ConnectionFactory { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "MyQueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (ch, ea) =>
{
var content = Encoding.UTF8.GetString(ea.Body.ToArray());
// Каким-то образом обрабатываем полученное сообщение
Debug.WriteLine($"Получено сообщение: {content}");
_channel.BasicAck(ea.DeliveryTag, false);
};
_channel.BasicConsume("MyQueue", false, consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
В данном примере мы создали консьюмер для локального сервера RabbitMQ:
var factory = new ConnectionFactory() { HostName = "localhost" };
Для облачного сервиса RabbitMQ нужно изменить эту строку так, как описано выше в разделе создания продюсера.
В метод ConfigureServices класса Startup добавим строчку:
services.AddHostedService<RabbitMqListener>();
Запустим оба проекта – продюсер и консьюмер.
Перейдем в сваггер продюсера, и отправим произвольное сообщение.
В окне Output from Debug появится наше сообщение, выведенное методом
Debug.WriteLine($"Получено сообщение: {content}");
нашего консьюмера.
2.3. Создаем проект .NET Core Console App с консьюмером.
Для этого в метод Main класса Program нового проекта добавляем следующий код:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
//var factory = new ConnectionFactory() { Uri = new Uri("строка_подключения_облако") };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "MyQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "MyQueue",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
И запускаем проект. Вывод полученного сообщение будет осуществлен в консоль.
Комментарии (18)
boldMahoney
06.02.2022 15:22Однобокий и технически неграмотный туториал. После прочтения возникает вопросов больше, чем ответов. От таких статей больше вреда, чем пользы. В официальном мануале итак почти все разжевано доступным языком https://www.rabbitmq.com/documentation.html
KislyFan
06.02.2022 15:33+1Первый вопрос, а зачем вообще нужен брокер сообщений? Я давно делаю приложения на ASP.NET (классика ASP + EF + WebApi / Razor), но не понимаю в каком случае брокера нужно использовать. Было бы классно увидеть это в начале статьи.
KGeist
06.02.2022 16:50+2У нас RabbitMQ используется в качестве событийной шины для общения между сервисами. Сервис А генерирует событие, оно кладётся в "exchange", который раскидывает сообщение по подключённым очередям сервисов B, C, D и т.д., которые в нужном им темпе извлекают события и как-то их обрабатывают. Сервис может таким образом реагировать на свои же события, т.е. получаем асинхронную обработку. С помощью очереди можно ещё реализовать асинхронные команды. Тут плюс в том, что очередь персистируется, и если сервис упал и не успел сделать ACK, то произойдёт retry (т.к. останется в очереди), что полезно для отказустойчивости, плюс там есть свои плюшки для мониторинга и работы именно с очередями... Но есть и свои минусы - если память кончается, то он заблочит все соединения (начнут висеть без ответа) и обработчики должны быть идемпотентными из-за at least once delivery. Если инстанс приложения один и объёмы небольшие, можно обрабатывать события сразу в том же запросе/транзакции.
dimaaan
06.02.2022 18:40>> если сервис упал и не успел сделать ACK, то произойдёт retry
Retry можно делать и без посредников в виде RabbitMQ обычным циклом, которму передается делегат или использовать Polly
KGeist
06.02.2022 20:36+1Если упал процесс обработчика, то теряется контекст (теряется событие), и перебор циклом не поможет, так как мы не знаем, с чем работать. Конечно, можно сохранять события и в БД, но тогда нам придется самим переизобретать то, что уже умеет RabbitMQ.
dimaaan
06.02.2022 21:03>> Если упал процесс
Какой именно процесс? producer или consumer?
-
Давайте представим, что успал producer.
С RabbitMQ: MQ сервер не получит сообщения, оно не встанет в очередь и не дойдет до получателя.
Без RabbitMQ: сообщение так же не дойдет.
-
Давайте представим, что упал consumer.
С RabbitMQ: RabbitMQ сохранит сообщение в очереди и доставит его, когда consumer будет готов.
Без RabbitMQ: оно всё равно дойдет при следующем "try", ведь контекст сохранен в producer'е. Но есть одно НО, если во время retry producer упадет, мы потеряем сообщение. (гол в пользу Rabbit)
-
Давайте представим, что упал сам RabbitMQ.
С RabbitMQ: consumer не получит сообщения
Без RabbitMQ: ситуация невозможна (гол против Rabbit)
Итого счёт 1:1.
Но "если брюки выглядят одинаково, зачем платить больше?" ©
Я не вижу реальной выгоды от Рэббита, хотя может я что-то упустил в своем анализе?
KGeist
06.02.2022 21:32Я имел в виду падение именно консьюмера (подкорректировал оригинальное сообщение) - сообщение тогда потеряно, т.е. гол в пользу RabbitMQ.
Что касается продьюсера, то верно замечено, что если мы упали после сохранения в БД, но до добавления события в очередь RabbitMQ, то мы тоже потеряем событие (имею в виду полное падение продьюсера, где retry невозможен). Для этого используется идиома transactional outbox - информация о событии создаётся и сохраняется в локальную БД в той же транзакции, что и бизнесовая операция на запись. Т.е если мы упали, то откатится вся транзакция сразу (ни изменений, ни событий). А если транзакция успешно закоммичена, то наличие события в БД гарантирует успешность/транзакционную целостность. Далее отдельный поток извлекает такие события из БД и кладёт в RabbitMQ с ретраем - т.е. мы гарантируем, что событие будет доставлено, в том числе если временно отвалится брокер. Плюс брокера в том, что это отдельный сервис, через который можно подключить по отдельной очереди на каждый сервис (для параллельной обработки), и он имеет из коробки протестированные временем инструменты для менеджмента таких очередей. Обычно польза чувствуется только если проект придерживается микросервисной архитектуры, где сервисы изолированы и с собственными БД, иначе можно, конечно, обойтись и без RabbitMQ, т.к. в монолите у нас и так есть доступ к персистированным событиям.
Также замечено верно, что RabbitMQ в таком случае становится узким горлышком (если он упал, то встанет вся обработка), но с таким же успехом может упасть и БД монолита. Тут главное, как по мне - возможность продолжить работу после падения как ни в чём не бывало (что позволяет персистирование очередей) и сам механизм exchanges/queue, который есть из коробки. Конечно, тут магии нет, и можно написать свой сервис событий, но можно использовать и готовый инструмент, проверенный временем.
dimaaan
06.02.2022 22:14Что-то я совсем запутался. Сначала вы пишете, что: "Конечно, можно сохранять события и в БД, но тогда нам придется самим переизобретать то, что уже умеет RabbitMQ."
Затем, отвечая на мои возражения вы пишете про: "идиома transactional outbox - информация о событии создаётся и сохраняется в локальную БД" ... "Далее отдельный поток извлекает такие события из БД и кладёт в RabbitMQ".
Разве это не то самое переизобретение из комента выше?
Т.е было:
producer ➡ rabbitmq ➡ consumer
стало
producer ➡ DB ➡ inter-producer ➡ rabbitmq ➡ consumer
Точка отказа сместилась, но осталась, система усложнилась. Непонимаю где профит? Мы можем добавлять промежуточные звенья в систему, но это не сделает её лучше.
KGeist
06.02.2022 22:36Разве это не то самое переизобретение из комента выше?
В локальной БД хранится только outbox сервиса, без доп. логики. Если у нас N сервисов, заинтересованных в событии, то нам нужно N инбоксов, для каждого сервиса. В той же БД их невозможно хранить (т.к. БД разные), и на каждое обработанное событие для каждого сервиса также нужно поддерживать состояние (ack или не-ack). Профит в чём: 1) отказ продьюсера (его самого или его БД) не останавливает обработку, т.к. консьюмеры независимы и работают со своими копиями сообщений 2) гарантирует целостность данных 3) уменьшается связность сервисов, т.к. продьюсер не знает своих обработчиков. RabbitMQ довольно стабильная штука, и больше вероятности упасть не ему, а какому-то кастомному сервису одной из команд, напр. после неудачного релиза (а их овердохрена), поэтому и выносится это на редко изменяемый стабильный внешний сервис. Как я уже сказал, это полезно в основном при микросервисной архитектуре для повышения масштабируемости и отказоустойчивости, иначе для более простых проектов это явный оверинджиниринг.
система усложнилась
У нас всё ещё сложнее :) Каждый аккаунт это отдельная организация с тысячами пользователей (tenants), и чтобы избежать ситуаций, где одна большая организация забивает очередь событий 9000-ми событий и останавливает обработку для более мелких (т.к. они будут ждать своей очереди), у нас есть ещё одна прослойка, которая диспатчит события "справедливо", т.е. распределяет их порциями, чтобы у каждого аккаунта обработка продвигалась равномерно.
Это всё придумано не просто так "чтобы было" и является эволюционным решением конкретных проблем бизнеса (до этого был простой монолит без RabbitMQ, но перестало вывозить); если вам в ваших проектах профита нет, то не знаю, зачем вам доказывать профит :) Он есть, но решается для конкретных задач. Можно жить и без брокеров, но иметь в виду как инструмент всё же стоит :)
dimaaan
06.02.2022 23:03>> если вам в ваших проектах профита нет, то не знаю, зачем вам доказывать профит :)
А если профит есть, то и доказывать ничего не надо :)
Но если серьезно, используем его уже давно, а профита всё не вижу, поэтому нахожусь в постоянном поиске оного.
Спасибо, что поделились со мной :)
-
kawena54
06.02.2022 18:57Я один заметил что RabbitMqListener код не верный? А точнее создается коньсюмер к нему прилепывается листенер, но т.к коньсюмер никуда не присвается (в локальной области видимости) и не вызывается эвейтер , получается по консьюмеру произойдет диспос при выходе из метода ?
s207883
06.02.2022 19:05-1Не лучше ли переложить все это на Masstransit или его аналоги?
Метод отправки слишком захардкоженный. Зачастую передаются объекты, соотвествено и сам метод отправки должен быть универсальным.
Alexannndrr
создавать connection на каждый SendMessage есть сомнительная идея
sebasww
Это же пример. Понятно, что надо выносить подключение в отдельный компонент и мониторить обрывы...
sebasww
И подписываться на событие тоже так себе идея. Лучше свой консамер подсунуть.
KGeist
>консамер
А еще лучше консьюмер