В этой статье мы рассмотрим реализацию библиотеки MassTransit в сочетании с RabbitMQ в приложении ASP.NET Core. Для начала, мы затронем некоторые из продвинутых функций RabbitMQ, а также ряд концепций, с которыми можно столкнуться, используя библиотеку MassTransit. Ну и наконец, мы разберёмся, как использовать эту библиотеку в приложении ASP.NET Core Web API.

Примечание: этот материал является переводом статьи популярного зарубежного ресурса. Ознакомиться с оригиналом можно здесь.

Перед тем как пойти дальше, хотелось бы упомянуть о нескольких важных моментах:

  • Ожидается, что вы имеете базовое представление о RabbitMQ и работе с ним

  • Для локального запуска сервера RabbitMQ мы будем использовать Docker.

  • Весь написанный сегодня код вы сможете найти в этом GitHub репозитории.

После прочтения этого материала у вас появится уверенное понимание того, что такое MassTransit, какими преимуществами он обладает и как мы можем использовать его в сочетании с RabbitMQ.

Что такое RabbitMQ?

RabbitMQ - это брокер сообщений, который обрабатывает принятие, хранение и отправку сообщений между нашими сервисами. Использование брокера сообщений позволяет нам создавать независимые и производительные приложения, основанные на асинхронном взаимодействии между собой.

Что такое exchange в RabbitMQ?

При работе с RabbitMQ producers (издатели) имеют два варианта, куда они могут отправить сообщения:

  • Queues (очереди)

  • Exchanges (обменники или точки обмена)

Когда producer посылает сообщение в очередь, то сообщение из нее получит каждый consumer (потребитель) этой очереди.

А что, если мы захотим выборочно отправлять сообщения в разные очереди на основе метаданных, содержащихся в этом сообщении? Здесь-то нам и приходит на помощь exchange.

Обменники получают сообщения от издателей и, в зависимости от своей конфигурации, посылают сообщение в одну или несколько очередей. Для этого необходимо создать привязку (binding), которая обеспечит отправку наших сообщений из exchange в очереди.

Мы можем использовать один из следующих типов exchange:

  • Direct

  • Topic

  • Headers

  • Fanout

В данной статье мы остановимся только на типе Fanout, поскольку именно он используется в MassTransit по умолчанию. Тип Fanout достаточно прост: он транслирует все получаемые сообщения во все очереди, которые создали с ним связь (binding).

Что такое MassTransit?

MassTransit - это бесплатный open-source фреймворк для .NET приложений. Он абстрагирует логику, необходимую для работы с брокерами сообщений (например RabbitMQ), тем самым упрощая создание проектов на основе сообщений с слабой связностью.

Сначала рассмотрим несколько основных концепций:

Service Bus (обычно - просто Bus), - это тип приложения, обрабатывающий доставку сообщений.

Transports - это вся разновидность брокеров сообщений, с которыми работает MassTransit. Сюда входят RabbitMQ, InMemory, Azure Service Bus и другие.

Message - это контракт, создаваемый в виде класса или интерфейса в .NET.

Command - это тип сообщения, отправляемый в конечную точку (очередь). Используется для указания сервису, что нужно сделать. Обычно такие сущности именуют как "глагол-существительное". (Например, UpdateCustomerAddress или SubmitOrder)

Event (событие) - это еще один тип сообщений, описывающий уже произошедшее действие. События публикуются одному или нескольким потребителям и, как правило, называются с помощи последовательности "существительное-глагол" в прошедшем времени. (Например, CustomerAddressUpdated или OrderSubmitted)

Зачем использовать MassTransit?

У библиотеки MassTransit существует ряд преимуществ по сравнению с нативным использованием брокера сообщений. Во-первых, абстрагируясь от логики основного брокера сообщений, мы можем работать сразу с несколькими из них, совершенно не переписывая код. Например, это преимущество позволяет нам работать с InMemory локально, а затем, при развертывании нашего кода, использовать другие технологии, такие как Azure Service Bus или Amazon Simple Queue Service.

Кроме того, при работе с системой на основе брокера сообщений, нам понадобится делать собственные реализации множества существующих шаблонов, например, retry, circuit breaker, outbox. MassTransit же предоставляет готовые реализации, а также множество других функций из коробки, такие как обработка исключений, распределенные транзакции и мониторинг.

Теперь, когда мы разобрались с тем, что такое MassTransit и почему его стоит использовать, давайте посмотрим, как мы можем использовать его вместе с RabbitMQ в ASP.NET Core.

Реализация MassTransit с RabbitMQ в ASP.NET Core

Установка RabbitMQ

Прежде чем создавать наше приложение, нам потребуется запустить сервер RabbitMQ в Docker-контейнере. Откроем командную строку и используем следующую команду docker run для запуска нашего сервера:

docker run -d --hostname my-rabbitmq-server --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Мы воспользуемся образом rabbitmq:3-management из DockerHub, благодаря которому получим доступ к пользовательскому интерфейсу RabbitMQ на порту 15672. Нам также следует добавить отображение порта 5672, который RabbitMQ использует для коммуникации по умолчанию. Чтобы получить доступ к UI, откроем окно браузера и перейдем по адресу localhost:15672, воспользовавшись стандартным логином и паролем: guest/guest. Мы вернемся к этому интерфейсу чуть позже, чтобы посмотреть что именно MassTransit создает для нас в RabbitMQ.

Создание общей библиотеки классов

При использовании MassTransit, мы должны определить класс или интерфейс для передаваемого сообщения. MassTransit использует полное имя типа (включая пространство имен) для контрактов сообщений, так что нам следует использовать общий класс / интерфейс для правильной адресации сообщения.

Поэтому первое, что мы сделаем - это создадим библиотеку классов, содержащую общий интерфейс, который мы будем использовать для наших приложений producer и consumer.

Для начала создадим библиотеку SharedModels, а внутри нее определим следующий интерфейс:

public interface OrderCreated 
{
    int Id { get; set; }
    string ProductName { get; set; }
    decimal Price { get; set; }
    int Quantity { get; set; }
}

Для имени интерфейса используется прошедшее время, поскольку это тип сообщения event. Это всё, что нам понадобится от нашей библиотеки классов SharedModels. Далее мы реализуем нашего издателя.

Создание producer с использованием MassTransit

Давайте создадим нашего producer, который мы реализуем на основе проекта ASP.NET Core Web API. Первое, что нам нужно сделать, это добавить ссылку на проект нашей библиотеки классов SharedModels.

Затем в проект с издателем добавим несколько пакетов NuGet для MassTransit:

  • MassTransit

  • MassTransit.AspNetCore

  • MassTransit.RabbitMQ

Теперь настроим MassTransit для использования RabbitMQ в файле Program.cs:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq();
});

builder.Services.AddMassTransitHostedService();

Примечание: Для актуальных версий пакета MassTransit можно игнорировать строку builder.Services.AddMassTransitHostedService().

Здесь мы сначала создаем наш WebApplicationBuilder.

Затем конфигурируем MassTransit для использования RabbitMQ, с помощью метода UsingRabbitMq(). Последний шаг - вызываем AddMassTransitHostedService, который автоматически обрабатывает запуск / остановку шины (как был сказано выше, вам не нужно делать это для новых версий библиотеки).

Прежде чем мы создадим контроллер API, сначала реализуем класс OrderDto, который будет использоваться в качестве входного параметра для эндпоинта:

public class OrderDto
{
    public string ProductName { get; set; }
    public decimal Price { get; set; }
    public int Quantity { get; set; }
}

После настройки MassTransit для использования RabbitMQ и определения нашего DTO давайте создадим контроллер API, который будет публиковать сообщение, или точнее говоря - событие, используя наш интерфейс OrderCreated:

[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
    private readonly IPublishEndpoint _publishEndpoint;

    public OrdersController(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }
}

Первое, что мы должны сделать, это внедрить IPublishEndpoint в наш контроллер, который мы будем использовать для публикации нашего события.

Теперь мы можем создать конечную точку для создания заказа:

[HttpPost]
public async Task<IActionResult> CreateOrder(OrderDto orderDto)
{
    await _publishEndpoint.Publish<OrderCreated>(new
    {
        Id = 1,
        orderDto.ProductName,
        orderDto.Quantity,
        orderDto.Price
    });

    return Ok();
}

Далее, создадим новый метод с названием CreateOrder и пометим его атрибутом HttpPost. Этот метод будет принимать наш OrderDto в качестве параметра. Внутри метода мы вызываем generic-метод Publish интерфейса IPublishEndpoint, используя нашу сущность OrderCreated для определения типа события, которое мы собираемся публиковать.

Затем создадим анонимный тип, при этом убедившись, что мы используем те же имена свойств, которые определены в нашем интерфейсе OrderCreated. Этого достаточно для публикации события в наш транспорт, RabbitMQ.

Наконец, мы вернем результат Ok.

Создание consumer с использованием MassTransit

Теперь, когда у нас есть producer, давайте рассмотрим создание очень простого consumer с использованием консольного приложения .NET.

Создадим консольное приложение с названием Consumer. Как и в приложении издателя, мы должны добавить ссылку на проект нашей библиотеки классов SharedModels. Мы также должны добавить несколько пакетов NuGet для MassTransit:

  • MassTransit

  • MassTransit.RabbitMQ

На этот раз нам не понадобится пакет MassTransit.AspNetCore, так как мы не будем использовать внедрение зависимостей.

После добавления ссылок создадим реализацию Consumer, которая будет содержать логику для обработки полученных сообщений:

class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var jsonMessage = JsonConvert.SerializeObject(context.Message);
        Console.WriteLine($"OrderCreated message: {jsonMessage}");
    }
}

Сначала мы создаем класс с названием OrderCreatedConsumer, убедившись, что мы реализуем generic-интерфейс IConsumer, передав туда наш интерфейс OrderCreated, который мы определили в библиотеке SharedModels раньше. В методе Consume мы просто сериализуем объект сообщения и записываем сообщение в консоль в целях этой статьи.

Теперь, когда наш класс consumer определен, давайте настроим его в Program.cs для использования MassTransit:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.ReceiveEndpoint("order-created-event", e =>
    {
        e.Consumer<OrderCreatedConsumer>();
    });
});

Создадим IBusControl, используя статический класс Bus, содержащийся в MassTransit, и настроим его для использования RabbitMQ. Затем мы должны вызвать метод ReceiveEndpoint, получающий сообщения из очереди order-created-event. Наконец, мы используем ранее созданный OrderCreatedConsumer для обработки сообщений из этой очереди.

После настройки consumer для получения сообщений, последнее, что нам нужно сделать, это запустить нашу шину:

await busControl.StartAsync(new CancellationToken());

try
{
    Console.WriteLine("Press enter to exit");

    await Task.Run(() => Console.ReadLine());
}
finally
{
    await busControl.StopAsync();
}

Сначала вызовем StartAsync нашего busControl, передавая ему CancellationToken. Затем настроим консольное приложение для работы без выхода доходя до строки Console.ReadLine. В качестве последнего шага, убедимся, что мы останавливаем наш busControl, вызывая метод StopAsync.

Тестирование приложения

Теперь пришло время протестировать написанный код. Запустим веб-браузер (проект producer) и окно консоли (проект consumer). Затем отправим POST-запрос на https://localhost:7188/api/orders, передав сущность "Заказ" в теле запроса:

{
    "productName": "keyboard",
    "quantity": 1,
    "price": 99.99
}

Посмотрим на окно консоли, где мы увидим, что наш producer правильно отправил сообщение в RabbitMQ, и которое наш consumer успешно получил:

OrderCreated message: {"Id": 1, "ProductName": "keyboard", "Price": 99.99, "Quantity": 1}

Более того, мы можем перейти в пользовательский интерфейс RabbitMQ в нашем браузере, чтобы увидеть, какие exchanges создал MassTransit:

Мы должны увидеть 2 созданных exchange, оба использующие тип fanout. Один для события SharedModels:OrderCreated, который использует пространство имен и имя модели из нашей общей библиотеки. А также увидим exchange для нашей конечной точки, которую мы определили в проекте consumer: order-created-event, который имеет привязку к SharedModels:OrderCreated.

Проверим также и созданные очереди:

Как мы видим, наша очередь order-created-event была создана, и имеет привязку к exchange с тем же именем.

Заключение

В этой статье мы узнали о более продвинутой функции RabbitMQ - exchange, а также рассмотрели использование библиотеки MassTransit и определились, почему мы выбрали ее вместо одной из нативного использования брокеров сообщений. Наконец, мы на практике научились создавать producer и consumer с использованием MassTransit и RabbitMQ.

Комментарии (7)


  1. mikegordan
    05.09.2023 13:18

    Здесь бы хорошо услышать как реализован massstransit скалинг . Т.к. у кафки это партиции , а у рэбита можно подключать N клиентов к одной очереди.

    Самый интересный момент "massstransit" же абстракция, как же эта абстракция работает НАД абсолютно разными типами из перечисленных выше системах (кафка\рэбит) в плане масштабирования?


    1. Severus1992
      05.09.2023 13:18

      Да собственно никак. Когда в последний раз смотрел реализацию для публикации сообщений в Кафку, обнаружил создание отдельного продюсера под каждый топик, закрыл и больше не возвращался к этой либе.


      1. mikegordan
        05.09.2023 13:18

        это печально.

        получается либа для сайтов "Пивнушек" ( для hiload не годится


    1. Mentor
      05.09.2023 13:18

      Для кафки и других аналогичных брокеров используются т.к. называемые Riders, т.е. технология отличается от той, что используется в RabbitMQ, https://masstransit.io/documentation/concepts/riders


  1. Famouse
    05.09.2023 13:18

    Подскажите, стоит ли рассматривать использование MassTransit в случае если второе приложение, скажем, издатель написан не на .NET? Возможно ли будет как-то "удобно" связать их? Лично я пока не вижу очевидных решений, может быть, даже, и не имеет смысла.


    1. s207883
      05.09.2023 13:18

      Почему нет? Можно же читать очередь при помощи mass transit, издатель пусть чем хочет, тем и пишет. Сообщение все равно представляет собой json, который transit превратит в объект c#.


  1. Mentor
    05.09.2023 13:18

    Когда producer посылает сообщение в очередь, то сообщение из нее получит каждый consumer (потребитель) этой очереди.

    Дочитал до этого абзаца, и понял, что дальше читать нет смысла