RabbitMQ – это брокер сообщений, служба, отвечающая за обмен сообщениями между разными программными сервисами.

RabbitMQ держит сообщения в очереди (Queue), которая является именованным буфером, хранящим адресованные ему сообщения.

Программа, посылающая сообщения в очередь RabbitMQ, называется поставщиком (Producer).

Программа, принимающая сообщения, называется подписчиком (Consumer). Такие программы подписываются на события поступления сообщения в очередь, и всегда находятся в ожидании новых сообщений.

Множество поставщиков могут отправлять сообщения в очередь, и множество подписчиков могут считывать сообщения из очереди.

  1. Запуск сервера RabbitMQ

В целях данного туториала, нам необходимо иметь запущенную службу RabbitMQ, и сделать это мы можем двумя способами: развернуть сервер RabbitMQ локально в докер-контейнере, или воспользоваться сторонним облачным сервисом. Ниже рассмотрим оба варианта, и вы можете выбрать для себя наиболее подходящий.

1.1. Способ 1: Запуск сервера RabbitMQ в докер контейнере на своем локальном компьютере

Для этого нам необходимо, чтобы на нашем компьютере предварительно было установлено программное обеспечение Docker Desktop.

- Для этого переходим по ссылке https://www.docker.com/products/docker-desktop, скачиваем установщик и запускаем его.

Важно выбрать опции Enable Hyper-V Windows Features или Install required Windows components for WSL 2, когда установщик об этом спросит.

Следуем инструкциям установщика, и по завершении процесса установки жмем кнопку Close.

Находим Docker Desktop в списке установленных на компьютере программ, и запускаем его.

В командной строке Windows или в PowerShell поочередно запускаем две следующие команды:

docker run -d --hostname my-rabbit-host --name my-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management

и

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Окно командной строки не закрываем.

В приложении Docker Desktop, на вкладке Containers / Apps увидим запущенные контейнеры RabbitMQ:

В браузере перейдем по ссылке localhost:15672. В поля имя пользователя и пароль введем guest и guest. Здесь мы можем управлять нашей службой RabbitMQ.

1.2. Способ 2: Регистрация в облачном сервисе CloudAMQP и настройка сервиса RabbitMQ

Переходим по адресу https://www.cloudamqp.com/plans.html.

Выбираем бесплатный план Little Lemur (жмем Get Now).

Регистрируемся на сайте.

Создаем новую сущность (instance) RabbitMQ.

На следующей странице выбираем доступный регион расположения сервера и жмем Review.

На странице Configure жмем Create Instance.

На следующей странице жмем на названии нашей новой сущности:

и попадаем в панель управления нашего экземпляра RabbitMQ:

  1. Использование  RabbitMQ в проектах ASP.NET Core

Создадим 2 проекта ASP.NET Core Web API: проект, который будет поставщиком (продюсером) сообщений, и проект, который будет подписчиком (консьюмером).

Для обоих проектов, после их создания на последующих шагах, необходимо выполнить установку NuGet пакета: RabbitMQ.Client.

Для этого в SolutionExplorer (Обозреватель решений) правой кнопкой мыши жмем по названию рабочего проекта и выбираем Manage NuGet Packages… (Управление пакетами Nuget).

Далее переходим на крайнюю левую вкладку Browse, и в строку поиска вводим название устанавливаемого пакета NuGet.

В левом окне выбираем нужный нам пакет, а в правом жмем кнопку Install.

2.1. Создаем проект ASP.NET Core Web API с продюсером.

В Visual Studio создаем новый проект ASP.NET Core Web API:

В новый проект добавляем NuGet пакет RabbitMQ.Client как описано выше.

В корне проекта создаем папку “RabbitMq”, и в этой папке создаем интерфейс:

public interface IRabbitMqService
{
	void SendMessage(object obj);
	void SendMessage(string message);
}

В этой же папке создаем класс:

using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

public class RabbitMqService : IRabbitMqService
{
	public void SendMessage(object obj)
	{
		var message = JsonSerializer.Serialize(obj);
		SendMessage(message);
	}

	public void SendMessage(string message)
	{
		// Не забудьте вынести значения "localhost" и "MyQueue"
		// в файл конфигурации
		var factory = new ConnectionFactory() { HostName = "localhost" };
		using (var connection = factory.CreateConnection())
		using (var channel = connection.CreateModel())
		{
			channel.QueueDeclare(queue: "MyQueue",
                           durable: false,
                           exclusive: false,
                           autoDelete: false,
                           arguments: null);

			var body = Encoding.UTF8.GetBytes(message);

			channel.BasicPublish(exchange: "",
                           routingKey: "MyQueue",
                           basicProperties: null,
                           body: body);
		}
	}
}

В данном примере мы создали продюсер для локального сервера RabbitMQ:

var factory = new ConnectionFactory() { HostName = "localhost" };

Для облачного сервиса RabbitMQ нужно изменить эту строку.

Перейдем в панель управления CloudAMQP, и нажмем значок “скопировать” напротив строки подключения: AMQP URL.

Вставим скопированную строку подключения в наш код:

var factory = new ConnectionFactory() { Uri = new Uri("строка_подключения") };

Тееперь продюсер будет отправлять сообщения в облачный сервис.

Добавим класс RabbitMqService в DI контейнер. Для этого добавим в метод ConfigureServices класса Startup следующую строку:

public void ConfigureServices(IServiceCollection services)
{
	// другой код
	services.AddScoped<IRabbitMqService, RabbitMqService>();
	// другой код
}

В папку Controllers добавим новый API контроллер:

Добавим в него код:

using Microsoft.AspNetCore.Mvc;
using RabbitMqProducer.RabbitMq;

[Route("api/[controller]")]
[ApiController]
public class RabbitMqController : ControllerBase
{
	private readonly IRabbitMqService _mqService;

	public RabbitMqController(IRabbitMqService mqService)
	{
		_mqService = mqService;
	}

	[Route("[action]/{message}")]
	[HttpGet]
	public IActionResult SendMessage(string message)
	{
		_mqService.SendMessage(message);

		return Ok("Сообщение отправлено");
	}
}

Запустим проект, и в сваггере (https://localhost:ваш_порт/swagger/index.html) вызовем метод SendMessage, передав в качестве параметра произвольную строку. Для этого нажмем на строке этого метода, в открывшейся панели вверху справа нажмем кнопку Try it out, и в окне message введем наше сообщение. Нажмем кнопку Execute.

В ответе мы должны получить статус код 200 и сообщение “Сообщение отправлено”.

Если вы используете локальный сервер RabbitMQ, переходим в панель управления локальным экземпляром RabbitMQ (http://localhost:15672).

Если вы используете облачный сервис CloudAMQP, переходим в его панель управления. В верхней части левого меню жмем кнопку RabbitMQ Manager и попадаем в такую же панель управления RabbitMQ, что и для локального экземпляра.

Перейдем на вкладку Queues, в табличке нажмем на наименовании нашей очереди ("MyQueue").

В списке внизу раскроем элемент Get messages, нажмем кнопку Get message(s), и увидим наше сообщение “Привет, Кролик!”:

Продюсер работает. 

2.2. Создаем проект ASP.NET Core Web API с консьюмером. 

В Visual Studio создаем новый проект ASP.NET Core Web API.

В новый проект добавляем NuGet пакет RabbitMQ.Client как описано выше.

В корне проекта создаем папку “RabbitMq”, и в этой папке создаем класс:

using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Threading.Tasks;
using System.Threading;
using Microsoft.Extensions.Hosting;
using System.Text;
using System.Diagnostics;
using System;


public class RabbitMqListener : BackgroundService
{
	private IConnection _connection;
	private IModel _channel;

	public RabbitMqListener()
	{
		// Не забудьте вынести значения "localhost" и "MyQueue"
		// в файл конфигурации
		var factory = new ConnectionFactory { HostName = "localhost" };
		_connection = factory.CreateConnection();
		_channel = _connection.CreateModel();
		_channel.QueueDeclare(queue: "MyQueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
	}

	protected override Task ExecuteAsync(CancellationToken stoppingToken)
	{
		stoppingToken.ThrowIfCancellationRequested();

		var consumer = new EventingBasicConsumer(_channel);
		consumer.Received += (ch, ea) =>
		{
			var content = Encoding.UTF8.GetString(ea.Body.ToArray());
			
			// Каким-то образом обрабатываем полученное сообщение
			Debug.WriteLine($"Получено сообщение: {content}");

			_channel.BasicAck(ea.DeliveryTag, false);
		};

		_channel.BasicConsume("MyQueue", false, consumer);

		return Task.CompletedTask;
	}
	
	public override void Dispose()
	{
		_channel.Close();
		_connection.Close();
		base.Dispose();
	}
}

В данном примере мы создали консьюмер для локального сервера RabbitMQ:

var factory = new ConnectionFactory() { HostName = "localhost" };

Для облачного сервиса RabbitMQ нужно изменить эту строку так, как описано выше в разделе создания продюсера.

В метод ConfigureServices класса Startup добавим строчку:

services.AddHostedService<RabbitMqListener>();

Запустим оба проекта – продюсер и консьюмер.

Перейдем в сваггер продюсера, и отправим произвольное сообщение.

В окне Output from Debug появится наше сообщение, выведенное методом

Debug.WriteLine($"Получено сообщение: {content}");

нашего консьюмера.

2.3. Создаем проект .NET Core Console App с консьюмером.

Для этого в метод Main класса Program нового проекта добавляем следующий код:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

static void Main(string[] args)
{
	var factory = new ConnectionFactory() { HostName = "localhost" };
  //var factory = new ConnectionFactory() { Uri = new Uri("строка_подключения_облако") };
	using (var connection = factory.CreateConnection())
	using (var channel = connection.CreateModel())
	{
		channel.QueueDeclare(queue: "MyQueue",
							 durable: false,
							 exclusive: false,
							 autoDelete: false,
							 arguments: null);

		var consumer = new EventingBasicConsumer(channel);
		consumer.Received += (model, ea) =>
		{
			var body = ea.Body.ToArray();
			var message = Encoding.UTF8.GetString(body);
			Console.WriteLine(" [x] Received {0}", message);
		};
		channel.BasicConsume(queue: "MyQueue",
							 autoAck: true,
							 consumer: consumer);

		Console.WriteLine(" Press [enter] to exit.");
		Console.ReadLine();
	}
}

И запускаем проект. Вывод полученного сообщение будет осуществлен в консоль.

Комментарии (18)


  1. Alexannndrr
    06.02.2022 12:02
    +3

    создавать connection на каждый SendMessage есть сомнительная идея


    1. sebasww
      06.02.2022 13:57
      +1

      Это же пример. Понятно, что надо выносить подключение в отдельный компонент и мониторить обрывы...


    1. sebasww
      06.02.2022 13:58

      И подписываться на событие тоже так себе идея. Лучше свой консамер подсунуть.


      1. KGeist
        06.02.2022 14:25

        >консамер

        А еще лучше консьюмер


  1. boldMahoney
    06.02.2022 15:22

    Однобокий и технически неграмотный туториал. После прочтения возникает вопросов больше, чем ответов. От таких статей больше вреда, чем пользы. В официальном мануале итак почти все разжевано доступным языком https://www.rabbitmq.com/documentation.html


    1. KislyFan
      06.02.2022 15:33
      +1

      Первый вопрос, а зачем вообще нужен брокер сообщений? Я давно делаю приложения на ASP.NET (классика ASP + EF + WebApi / Razor), но не понимаю в каком случае брокера нужно использовать. Было бы классно увидеть это в начале статьи.


      1. KGeist
        06.02.2022 16:50
        +2

        У нас RabbitMQ используется в качестве событийной шины для общения между сервисами. Сервис А генерирует событие, оно кладётся в "exchange", который раскидывает сообщение по подключённым очередям сервисов B, C, D и т.д., которые в нужном им темпе извлекают события и как-то их обрабатывают. Сервис может таким образом реагировать на свои же события, т.е. получаем асинхронную обработку. С помощью очереди можно ещё реализовать асинхронные команды. Тут плюс в том, что очередь персистируется, и если сервис упал и не успел сделать ACK, то произойдёт retry (т.к. останется в очереди), что полезно для отказустойчивости, плюс там есть свои плюшки для мониторинга и работы именно с очередями... Но есть и свои минусы - если память кончается, то он заблочит все соединения (начнут висеть без ответа) и обработчики должны быть идемпотентными из-за at least once delivery. Если инстанс приложения один и объёмы небольшие, можно обрабатывать события сразу в том же запросе/транзакции.


        1. dimaaan
          06.02.2022 18:40

          >> если сервис упал и не успел сделать ACK, то произойдёт retry

          Retry можно делать и без посредников в виде RabbitMQ обычным циклом, которму передается делегат или использовать Polly


          1. KGeist
            06.02.2022 20:36
            +1

            Если упал процесс обработчика, то теряется контекст (теряется событие), и перебор циклом не поможет, так как мы не знаем, с чем работать. Конечно, можно сохранять события и в БД, но тогда нам придется самим переизобретать то, что уже умеет RabbitMQ.


            1. dimaaan
              06.02.2022 21:03

              >> Если упал процесс

              Какой именно процесс? producer или consumer?

              1. Давайте представим, что успал producer.

                1. С RabbitMQ: MQ сервер не получит сообщения, оно не встанет в очередь и не дойдет до получателя.

                2. Без RabbitMQ: сообщение так же не дойдет.

              2. Давайте представим, что упал consumer.

                1. С RabbitMQ: RabbitMQ сохранит сообщение в очереди и доставит его, когда consumer будет готов.

                2. Без RabbitMQ: оно всё равно дойдет при следующем "try", ведь контекст сохранен в producer'е. Но есть одно НО, если во время retry producer упадет, мы потеряем сообщение. (гол в пользу Rabbit)

              3. Давайте представим, что упал сам RabbitMQ.

                1. С RabbitMQ: consumer не получит сообщения

                2. Без RabbitMQ: ситуация невозможна (гол против Rabbit)

              Итого счёт 1:1.

              Но "если брюки выглядят одинаково, зачем платить больше?" ©

              Я не вижу реальной выгоды от Рэббита, хотя может я что-то упустил в своем анализе?


              1. KGeist
                06.02.2022 21:32

                Я имел в виду падение именно консьюмера (подкорректировал оригинальное сообщение) - сообщение тогда потеряно, т.е. гол в пользу RabbitMQ.

                Что касается продьюсера, то верно замечено, что если мы упали после сохранения в БД, но до добавления события в очередь RabbitMQ, то мы тоже потеряем событие (имею в виду полное падение продьюсера, где retry невозможен). Для этого используется идиома transactional outbox - информация о событии создаётся и сохраняется в локальную БД в той же транзакции, что и бизнесовая операция на запись. Т.е если мы упали, то откатится вся транзакция сразу (ни изменений, ни событий). А если транзакция успешно закоммичена, то наличие события в БД гарантирует успешность/транзакционную целостность. Далее отдельный поток извлекает такие события из БД и кладёт в RabbitMQ с ретраем - т.е. мы гарантируем, что событие будет доставлено, в том числе если временно отвалится брокер. Плюс брокера в том, что это отдельный сервис, через который можно подключить по отдельной очереди на каждый сервис (для параллельной обработки), и он имеет из коробки протестированные временем инструменты для менеджмента таких очередей. Обычно польза чувствуется только если проект придерживается микросервисной архитектуры, где сервисы изолированы и с собственными БД, иначе можно, конечно, обойтись и без RabbitMQ, т.к. в монолите у нас и так есть доступ к персистированным событиям.

                Также замечено верно, что RabbitMQ в таком случае становится узким горлышком (если он упал, то встанет вся обработка), но с таким же успехом может упасть и БД монолита. Тут главное, как по мне - возможность продолжить работу после падения как ни в чём не бывало (что позволяет персистирование очередей) и сам механизм exchanges/queue, который есть из коробки. Конечно, тут магии нет, и можно написать свой сервис событий, но можно использовать и готовый инструмент, проверенный временем.


                1. dimaaan
                  06.02.2022 22:14

                  Что-то я совсем запутался. Сначала вы пишете, что: "Конечно, можно сохранять события и в БД, но тогда нам придется самим переизобретать то, что уже умеет RabbitMQ."

                  Затем, отвечая на мои возражения вы пишете про: "идиома transactional outbox - информация о событии создаётся и сохраняется в локальную БД" ... "Далее отдельный поток извлекает такие события из БД и кладёт в RabbitMQ".

                  Разве это не то самое переизобретение из комента выше?

                  Т.е было:

                  producer ➡ rabbitmq ➡ consumer

                  стало

                  producer ➡ DB ➡ inter-producer ➡ rabbitmq ➡ consumer

                  Точка отказа сместилась, но осталась, система усложнилась. Непонимаю где профит? Мы можем добавлять промежуточные звенья в систему, но это не сделает её лучше.


                  1. KGeist
                    06.02.2022 22:36

                    Разве это не то самое переизобретение из комента выше?

                    В локальной БД хранится только outbox сервиса, без доп. логики. Если у нас N сервисов, заинтересованных в событии, то нам нужно N инбоксов, для каждого сервиса. В той же БД их невозможно хранить (т.к. БД разные), и на каждое обработанное событие для каждого сервиса также нужно поддерживать состояние (ack или не-ack). Профит в чём: 1) отказ продьюсера (его самого или его БД) не останавливает обработку, т.к. консьюмеры независимы и работают со своими копиями сообщений 2) гарантирует целостность данных 3) уменьшается связность сервисов, т.к. продьюсер не знает своих обработчиков. RabbitMQ довольно стабильная штука, и больше вероятности упасть не ему, а какому-то кастомному сервису одной из команд, напр. после неудачного релиза (а их овердохрена), поэтому и выносится это на редко изменяемый стабильный внешний сервис. Как я уже сказал, это полезно в основном при микросервисной архитектуре для повышения масштабируемости и отказоустойчивости, иначе для более простых проектов это явный оверинджиниринг.

                    система усложнилась

                    У нас всё ещё сложнее :) Каждый аккаунт это отдельная организация с тысячами пользователей (tenants), и чтобы избежать ситуаций, где одна большая организация забивает очередь событий 9000-ми событий и останавливает обработку для более мелких (т.к. они будут ждать своей очереди), у нас есть ещё одна прослойка, которая диспатчит события "справедливо", т.е. распределяет их порциями, чтобы у каждого аккаунта обработка продвигалась равномерно.

                    Это всё придумано не просто так "чтобы было" и является эволюционным решением конкретных проблем бизнеса (до этого был простой монолит без RabbitMQ, но перестало вывозить); если вам в ваших проектах профита нет, то не знаю, зачем вам доказывать профит :) Он есть, но решается для конкретных задач. Можно жить и без брокеров, но иметь в виду как инструмент всё же стоит :)


                    1. dimaaan
                      06.02.2022 23:03

                      >> если вам в ваших проектах профита нет, то не знаю, зачем вам доказывать профит :)

                      А если профит есть, то и доказывать ничего не надо :)

                      Но если серьезно, используем его уже давно, а профита всё не вижу, поэтому нахожусь в постоянном поиске оного.

                      Спасибо, что поделились со мной :)


  1. kawena54
    06.02.2022 18:57

    Я один заметил что RabbitMqListener код не верный? А точнее создается коньсюмер к нему прилепывается листенер, но т.к коньсюмер никуда не присвается (в локальной области видимости) и не вызывается эвейтер , получается по консьюмеру произойдет диспос при выходе из метода ?


    1. alexalok
      06.02.2022 20:33

      В дебаге все ОК будет, а вот в релизе действительно ждёт сюрприз


      1. kawena54
        06.02.2022 21:58

        топ код :D


  1. s207883
    06.02.2022 19:05
    -1

    Не лучше ли переложить все это на Masstransit или его аналоги?

    Метод отправки слишком захардкоженный. Зачастую передаются объекты, соотвествено и сам метод отправки должен быть универсальным.