Привет! В этой статье разберём архитектуру MassTransit - одной из самых зрелых .NET-библиотек для построения распределённых систем, работы с брокерами сообщений и реализации паттернов вроде Saga и Consumer Pipeline.

Прежде чем вы начнёте читать статью, хочу сделать небольшую ремарку: это моя первая публикация, поэтому прошу отнестись с пониманием. Материал получился достаточно сложным, а его разбор потребовал большой работы.

Статья, вероятно, будет постепенно дорабатываться и уточняться по мере появления новых правок. В процессе такого глубокого исследования хочется как можно скорее поделиться полученными выводами, но одновременно бывает непросто сразу изложить всё максимально структурировано. Поэтому в отдельных местах возможна некоторая сумбурность, которую я постараюсь со временем исправить.

Быстрые переходы

Если хочется более глубокого разбора с кодом и навигацией по исходникам, у меня есть и видеоразбор на YouTube. Здесь же - краткая, структурированная версия для тех, кто хочет понять ключевые архитектурные идеи без погружения в 50+ минут видео.

Восхождение MassTransit

MassTransit получил широкую популярность в .NET-сообществе не только благодаря удобной интеграции и простоте использования, но и во многом благодаря рекомендациям Microsoft.

В официальных материалах, включая статью о реализации событийного взаимодействия между микросервисами (“Реализация взаимодействия на основе событий между микрослужбами (события интеграции)”), Microsoft подчёркивает, что не всегда стоит “изобретать велосипед”. Разработка, тестирование и поддержка собственных messaging-решений внутри компании требуют значительных ресурсов, тогда как зрелые готовые решения позволяют сократить стоимость и риски внедрения.

Рекомендации Microsoft
Рекомендации Microsoft

Почему MassTransit бывает сложно понять

MassTransit существует уже более 15 лет. Это означает, что библиотека формировалась ещё во времена классического .NET Framework - задолго до появления CoreCLR и современного .NET.

Коммиты с 2008г
Коммиты с 2008г
Chris Patterson
Chris Patterson

Автор библиотеки - Chris Patterson, Microsoft MVP и разработчик, который проектировал MassTransit как универсальный abstraction layer над транспортами (RabbitMQ, Azure Service Bus, Amazon SQS и др.).

Что такое MVP
Что такое MVP

При первом знакомстве кодовая база может показаться перегруженной. Но причина не в возрасте проекта, а в архитектурной насыщенности.

Основные причины сложности :

1) Большое количество паттернов MassTransit активно использует: Pipeline, Observer, Visitor, State, Finite State Machine(FSM) и тд… То есть библиотека - это не просто “обёртка над RabbitMQ”, а полноценный framework с богатой внутренней моделью.

2) Глубокое использование generics Практически вся архитектура строится вокруг <T>:

ConsumeContext<T>
ConsumerConsumeContext<TConsumer, TMessage>

3) Транспортно-специфичная логика MassTransit старается быть универсальным, но особенности конкретного транспорта всё равно проникают внутрь архитектуры.

Например, в RabbitMQ:

  • одно TCP-соединение может содержать множество channels;

  • channels нельзя безопасно использовать полностью параллельно без контроля;

  • клиент требует аккуратного управления concurrency, иначе можно столкнуться с нарушением AMQP framing.

Именно поэтому в Runtime работе RabbitMQ Transport’a есть такие компоненты, как ChannelExecutor, которые не выглядят случайными: они помогают переиспользовать channel и сериализовать операции там, где это необходимо

Мультиплексирование в RabbitMQ
Мультиплексирование в RabbitMQ

Можно в RabbitMq.Client библиотеке на .NET глянуть Connection, а так же Framing и почитать Документацию связанной с Connections

4) SagaStateMachine и визуальные графы
Когда разработчик впервые открывает SagaStateMachine, графы состояний и переходов могут выглядеть как часть runtime-магии.

На практике это скорее декларативный инструмент:

  • состояния

  • события

  • переходы

Графы нужны для визуализации и анализа, а не как основной runtime-механизм исполнения.

Код с примером визуализции графов
public class TestStateMachine : MassTransitStateMachine<TestState>
{
    public State Started { get; private set; }
    public State Updated { get; private set; }
    public State Processing { get; private set; }
    public State Processed { get; private set; }

    public Event<SomethingData> SomethingReceived { get; private set; }

    public TestStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => SomethingReceived, x =>
        {
            x.CorrelateById(context => context.Message.Id);
        });
       ...
    }
}

[ApiController]
[Route("api/[controller]")]
public class SomethingController : ControllerBase
{
     readonly TestStateMachine _machine; // внедряете вашу машину состояний

     public SomethingController(IPublishEndpoint publishEndpoint, 
                                                 TestStateMachine machine)
     {
        _publishEndpoint = publishEndpoint;
        _machine = machine;
     }

    [HttpGet("graph/edges")] // ребра графа
    public IActionResult GetGraphEdges()
    {
        var graph = _machine.GetGraph();

        var sb = new StringBuilder();
        sb.AppendLine("stateDiagram-v2");

        foreach (var edge in graph.Edges)
        {
            var from = edge.From.Title;
            var to = edge.To.Title;

            sb.AppendLine($"{from} --> {to}");
        }

        return Content(sb.ToString(), "text/plain");
    }
 
    [HttpGet("graph/vertices")] // вершины графа 
    public IActionResult GetGraphVertices()
    {
        var graph = _machine.GetGraph();

        var sb = new StringBuilder();
        sb.AppendLine("stateDiagram-v2");

        foreach (var vertex in graph.Vertices)
        {
            sb.AppendLine($"state {vertex.Title}");
        }

        return Content(sb.ToString(), "text/plain");
    }

Главная архитектурная идея: Pipeline

MassTransit изначально строился поверх библиотеки GreenPipes, которая реализовывала внутреннюю систему Pipeline. В новых версиях MassTransit(свыше 8.0.0) GreenPipes больше не используется как отдельная зависимость - её код был напрямую интегрирован в сам MassTransit(перенесли весь код GreenPipes напрямую в MassTransit т.е CTRL+C/CTRL+V). Это произошло потому, что GreenPipes практически не получила распространения за пределами экосистемы MassTransit, а существующие стандартные решения не соответствовали требованиям проекта.

На первый взгляд может показаться, что в основе MassTransit лежит классический архитектурный паттерн Pipes & Filters, но это не совсем так.

Как выглядит работа Pipes & Filters
Как выглядит работа Pipes & Filters

Классический Pipes & Filters хорошо знаком, например, по Bash-конвейерам, где результат работы одной программы передаётся следующей через символ |:

cat file.txt | grep "error" | sort | uniq

Здесь каждая программа выступает как Filter, а Pipe служит лишь каналом передачи данных между этапами обработки.

В MassTransit модель сложнее. Хотя терминология Pipes и Filters используется, архитектура ближе к Middleware Pipeline, чем к классическому Pipes & Filters. Цепочка обработки выглядит как последовательность Pipe -> Filter -> Pipe -> Filter, однако каждый Filter не просто обрабатывает сообщение и передаёт его дальше, а может:

  1. управлять дальнейшим выполнением цепочки

  2. изменять маршрут обработки

  3. добавлять ветвления

  4. завершать выполнение

  5. вызывать дополнительные Pipes

То есть Filter здесь выступает не только обработчиком, но и элементом управления потоком выполнения (Control Flow), что делает архитектуру MassTransit ближе к middleware-подходу, где каждый компонент контролирует, как именно будет выполняться следующий шаг. Именно поэтому Pipeline в MassTransit - это не просто линейная цепочка фильтров, а более гибкая и управляемая система обработки сообщений.

Как выглядит работа Middleware
Как выглядит работа Middleware

Отсюда это можно резюмировать, как Pipes & Filter с Middleware Control Flow

GreenPipes строится на следующих компонентах

  • Agents - управляют жизненным циклом инфраструктуры В часть их обязанностей входит: старт, завершение, определение готовности, сигнализация об ошибках В пример можно привести RabbitMqBasicConsumer, что является Agent’ом, внутри которого крутится постоянный loop получения сообщений. Здесь переопределен метод HandleBasicDeliver из rabbitmq.client с помощью IAsyncBasicConsumer

как выглядит RabbitMqBasicConsumer
public class RabbitMqBasicConsumer :
        Agent,
        IAsyncBasicConsumer,
        IBasicConsumer,
        RabbitMqDeliveryMetrics
    {
        readonly RabbitMqReceiveEndpointContext _context;
        readonly TaskCompletionSource<bool> _deliveryComplete;
        readonly IReceivePipeDispatcher _dispatcher;
        readonly SemaphoreSlim _limit;
        readonly ModelContext _model;
        readonly ConcurrentDictionary<ulong, RabbitMqReceiveContext> _pending;
        readonly ReceiveSettings _receiveSettings;

        string _consumerTag;

        EventHandler<ConsumerEventArgs> _onConsumerCancelled;

        /// <summary>
        /// The basic consumer receives messages pushed from the broker.
        /// </summary>
        /// <param name="model">The model context for the consumer</param>
        /// <param name="context">The topology</param>
        public RabbitMqBasicConsumer(ModelContext model, RabbitMqReceiveEndpointContext context)
        {
            _model = model;
            _context = context;

            _receiveSettings = model.GetPayload<ReceiveSettings>();

            _pending = new ConcurrentDictionary<ulong, RabbitMqReceiveContext>();

            _dispatcher = context.CreateReceivePipeDispatcher();
            _dispatcher.ZeroActivity += HandleDeliveryComplete;

            _deliveryComplete = TaskUtil.GetTask<bool>();

            if (context.ConcurrentMessageLimit.HasValue)
                _limit = new SemaphoreSlim(context.ConcurrentMessageLimit.Value);

            ConsumerCancelled += OnConsumerCancelled;
        }
...
        Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
            IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

            return Task.CompletedTask;
        }

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
            IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            var bodyBytes = body.ToArray();

            Task.Run(async () =>
            {
                LogContext.Current = _context.LogContext;

                var context = new RabbitMqReceiveContext(exchange, routingKey, _consumerTag, deliveryTag, bodyBytes, redelivered, properties,
                    _context, _receiveSettings, _model, _model.ConnectionContext);

                var added = _pending.TryAdd(deliveryTag, context);
                if (!added && deliveryTag != 1) // DIRECT REPLY-TO fixed value
                    LogContext.Warning?.Log("Duplicate BasicDeliver: {DeliveryTag}", deliveryTag);

                var receiveLock = _receiveSettings.NoAck ? default : new RabbitMqReceiveLockContext(_model, deliveryTag);

                if (_limit != null)
                    await _limit.WaitAsync(context.CancellationToken).ConfigureAwait(false);

                try
                {
                    await _dispatcher.Dispatch(context, receiveLock).ConfigureAwait(false);
                }
                catch (Exception exception)
                {
                    context.LogTransportFaulted(exception);
                }
                finally
                {
                    _limit?.Release();

                    if (added)
                        _pending.TryRemove(deliveryTag, out _);

                    context.Dispose();
                }
            });
        }
  • Supervisors - специальный тип Agent’а, который управляет другими Agent’ами. По сути, это “agent of agents” или оркестратор агентов, но это не все, ведь он может управлять другими supervisors. Supervisor хранит список управляемых Agent’ов, стартует их при инициализации и останавливает при завершении.

    Как выглядит например TransportPipeContextSupervisor
    public class TransportPipeContextSupervisor<T> :
        PipeContextSupervisor<T>,
        ITransportSupervisor<T>
        where T : class, PipeContext
    {
        readonly ISupervisor _consumeSupervisor;
        readonly ISupervisor _sendSupervisor;
    
        protected TransportPipeContextSupervisor(IPipeContextFactory<T> factory)
            : base(factory)
        {
            _consumeSupervisor = new Supervisor();
            _sendSupervisor = new Supervisor();
        }
    
        public void Probe(ProbeContext context)
        {
            if (HasContext)
                context.Add("connected", true);
        }
    
        public void AddSendAgent<TAgent>(TAgent agent)
            where TAgent : IAgent
        {
            _sendSupervisor.Add(agent);
        }
    
        public void AddConsumeAgent<TAgent>(TAgent agent)
            where TAgent : IAgent
        {
            _consumeSupervisor.Add(agent);
        }
    
        protected override async Task StopSupervisor(StopSupervisorContext context)
        {
            await _consumeSupervisor.Stop(context).ConfigureAwait(false);
    
            await _sendSupervisor.Stop(context).ConfigureAwait(false);
    
            await base.StopSupervisor(context).ConfigureAwait(false);
        }
    }
    
  • Probe представляет собой механизм диагностики и самоописания pipeline.
    Где используется:

    1. Диагностика
      Позволяет увидеть структуру pipeline и его состав.

    2. Monitoring / Observability
      Даёт информацию о состоянии компонентов (например, circuit breaker, попытки, ошибки).

    3. Отладка
      Помогает понять, почему фильтр не срабатывает или работает не так, как ожидается.

    4. Интеграции
      Может использоваться для экспорта данных наружу - например, в метрики или диагностические endpoints.

    5. Тестирование
      Позволяет писать более точные тесты:
      [1] проверять, что нужные фильтры подключены
      [2] валидировать порядок pipeline
      [3] убеждаться, что конфигурация применена корректно
      [4] тестировать поведение без запуска полного окружения

    Как выглядит Probe на примере PipeContextSupervisor
    public class PipeContextSupervisor<TContext> :
        Supervisor,
        ISupervisor<TContext>
        where TContext : class, PipeContext
    {
        readonly ISupervisor _activeSupervisor;
        readonly IPipeContextFactory<TContext> _contextFactory;
        readonly object _contextLock = new object();
        PipeContextHandle<TContext> _context;
    
        /// <summary>
        /// Create the cache
        /// </summary>
        /// <param name="contextFactory">
        /// Factory used to create the underlying and active contexts
        /// </param>
        public PipeContextSupervisor(IPipeContextFactory<TContext> contextFactory)
        {
            _contextFactory = contextFactory;
    
            _activeSupervisor = new Supervisor();
        }
    
        protected bool HasContext
        {
            get
            {
                lock (_contextLock)
                {
                    return _context is { IsDisposed: false };
                }
            }
        }
    
        void IProbeSite.Probe(ProbeContext context)
        {
            var scope = context.CreateScope("source");
            scope.Set(new
            {
                Type = TypeCache<PipeContextSupervisor<TContext>>.ShortName,
                HasContext,
            });
        }
    
        ...
    }
    
    Как выглядят тесты с помощью Probe
    [Test]
    public void Should_override_bus_setting_if_specified()
    {
        var busControl = MassTransit.Bus.Factory.CreateUsingActiveMq(cfg =>
        {
            cfg.PrefetchCount = 427;
    
            cfg.ReceiveEndpoint("input-queue", e =>
            {
                e.PrefetchCount = 351;
            });
        });
    
        var jsonString = busControl.GetProbeResult().ToJsonString();
        var probe = JObject.Parse(jsonString);
    
        Assert.That(GetPrefetchCount(probe, 0), Is.EqualTo(351));
        Assert.That(GetPrefetchCount(probe, 1), Is.EqualTo(427));
    }
    
  • Pipes - готовые, полностью собранные и исполняемые цепочки обработки. Это execution layer pipeline, который отвечает за реальное выполнение логики во время runtime’а. Он позволяет собрать цепочку обработки в единую рабочую систему. FilterPipe, EmtyPipe это одни из множества by default pipes к которым цепляются уже Filters, формируя итоговый Pipeline выполнения

    Как выглядит FilterPipe
    public class FilterPipe<TContext> :
        IPipe<TContext>
        where TContext : class, PipeContext
    {
        readonly IFilter<TContext> _filter;
        readonly IPipe<TContext> _next;
    
        public FilterPipe(IFilter<TContext> filter, IPipe<TContext> next)
        {
            _filter = filter;
            _next = next;
        }
    
        public void Probe(ProbeContext context)
        {
            _filter.Probe(context);
            _next.Probe(context);
        }
    
        [DebuggerStepThrough]
        public Task Send(TContext context)
        {
            return _filter.Send(context, _next);
        }
    }
    
    Как выглядит EmptyPipe
    public class EmptyPipe<TContext> :
        IPipe<TContext>
        where TContext : class, PipeContext
    {
        [DebuggerNonUserCode]
        Task IPipe<TContext>.Send(TContext context)
        {
            return Task.CompletedTask;
        }
    
        void IProbeSite.Probe(ProbeContext context)
        {
        }
    }
    
  • Filters является реальным обработчиком pipeline (middleware), который выполняется при вызове Send. Фильтры представляют собой конкретные этапы обработки в цепочке (Pipe), где каждый Filter может модифицировать контекст, добавлять логику или выполнять побочные операции. Структура Filter’а следует паттерну middleware и состоит из трех основных фаз:

  1. Фаза 1: До обработки (DoSomething)

  2. Фаза 2: Передача управления (next.Send)

  3. Фаза 3: После обработки (DoSomethingAfter)

    Как выглядит TransportReadyFilter
    public class TransportReadyFilter<T> :
        IFilter<T>
        where T : class, PipeContext
    {
        readonly ReceiveEndpointContext _context;
    
        public TransportReadyFilter(ReceiveEndpointContext context)
        {
            _context = context;
        }
    
        public async Task Send(T context, IPipe<T> next)
        {
            // Фаза 1: до обработки
            await _context.TransportObservers
                .NotifyReady(_context.InputAddress)
                .ConfigureAwait(false);
    
            var agent = new Agent();
            agent.SetReady();
    
            _context.AddConsumeAgent(agent);
    
            // Фаза 2: передача управления дальше по pipeline
            await next.Send(context).ConfigureAwait(false);
    
            // Фаза 3: после обработки
            await agent.Completed.ConfigureAwait(false);
        }
    }
    
  • Context управляет данными и жизненным циклом выполнения операции внутри Pipeline. Pipeline - это конвейер, по которому движется обработка. Context - это “посылка”, которая едет по конвейеру от одной станции к другой. Filters - это станции обработки. Каждый Filter берет Context, выполняет свою работу, может модифицировать Context и передает его следующему Filter’у.

    ConsumeContext (пример контекста в MassTransit)
    public interface ConsumeContext :
        PipeContext,
        MessageContext,
        IPublishEndpoint,
        ISendEndpointProvider
    {
        /// <summary>
        /// Контекст полученного сообщения
        /// </summary>
        ReceiveContext ReceiveContext { get; }
    
        /// <summary>
        /// Контекст сериализации сообщения (после десериализации)
        /// </summary>
        SerializerContext SerializerContext { get; }
    
        /// <summary>
        /// Асинхронная задача, завершающаяся после завершения обработки сообщения
        /// </summary>
        Task ConsumeCompleted { get; }
    
        /// <summary>
        /// Поддерживаемые типы сообщений, извлечённые из payload
        /// </summary>
        IEnumerable<string> SupportedMessageTypes { get; }
    
        /// <summary>
        /// Проверяет, содержится ли указанный тип сообщения в payload
        /// </summary>
        bool HasMessageType(Type messageType);
    
        ...
    }
    
  • Payloads являются способ прикреплять к Context’у дополнительные данные или объекты (то есть “полезную нагрузку”), которые будут путешествовать вместе с ним по всему Pipeline и будут доступны для всех Filter’ов. Лучшая аналогия - это “рюкзак” или “боковой карман” Context’a, что является “посылкой”, а Filters, как говорилось ранее “станции обработки”.

    TransactionFilter (пример работы с payloads)
    public class TransactionFilter<T> :
        IFilter<T>
        where T : class, PipeContext
    {
        readonly TransactionOptions _options;
    
        public TransactionFilter(
            IsolationLevel isolationLevel = IsolationLevel.ReadCommitted,
            TimeSpan timeout = default)
        {
            if (timeout == default)
                timeout = TimeSpan.FromSeconds(30);
    
            _options = new TransactionOptions
            {
                IsolationLevel = isolationLevel,
                Timeout = timeout
            };
        }
    
        void IProbeSite.Probe(ProbeContext context)
        {
            var step = context.CreateFilterScope("transaction");
            step.Add("isolationLevel", _options.IsolationLevel.ToString());
            step.Add("timeout", _options.Timeout);
        }
    
        [DebuggerNonUserCode]
        public async Task Send(T context, IPipe<T> next)
        {
            SystemTransactionContext systemTransactionContext = null;
    
            // Получение или создание payload, который "путешествует" вместе с context
            context.GetOrAddPayload<TransactionContext>(() =>
            {
                systemTransactionContext = new SystemTransactionContext(_options);
                return systemTransactionContext;
            });
    
            try
            {
                // Передача управления дальше по pipeline
                await next.Send(context).ConfigureAwait(false);
    
                // Commit транзакции после успешного выполнения pipeline
                if (systemTransactionContext != null)
                    await systemTransactionContext.Commit().ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                // Rollback при ошибке
                systemTransactionContext?.Rollback(ex);
                throw;
            }
            finally
            {
                systemTransactionContext?.Dispose();
            }
        }
    }
    
  • Configurators представляет собой язык конфигурации для описания поведения pipeline. Это удобный API, через который пользователь описывает, как должна работать система обработки сообщений. Когда конфигурация завершена и система стартует, Configurators исчезают со сцены. Их место занимают Agent’ы, Supervisor’ы, Pipe’ы и Filter’ы, которые выполняют реальную работу.

    RabbitMqBusFactoryConfigurator
    public class RabbitMqBusFactoryConfigurator :
        BusFactoryConfigurator,
        IRabbitMqBusFactoryConfigurator,
        IBusFactory
    {
        readonly IRabbitMqBusConfiguration _busConfiguration;
        readonly IRabbitMqHostConfiguration _hostConfiguration;
        readonly RabbitMqReceiveSettings _settings;
    
        public RabbitMqBusFactoryConfigurator(IRabbitMqBusConfiguration busConfiguration)
            : base(busConfiguration)
        {
            _busConfiguration = busConfiguration;
            _hostConfiguration = busConfiguration.HostConfiguration;
    
            var queueName = busConfiguration.Topology
                .Consume
                .CreateTemporaryQueueName("bus");
    
            var exchangeType = busConfiguration
                .BusEndpointConfiguration
                .Topology
                .Consume
                .ExchangeTypeSelector
                .DefaultExchangeType;
    
            _settings = new RabbitMqReceiveSettings(
                busConfiguration.BusEndpointConfiguration,
                queueName,
                exchangeType,
                durable: false,
                autoDelete: true);
    
            _settings.AutoDeleteAfter(TimeSpan.FromMinutes(1));
        }
    
        public IReceiveEndpointConfiguration CreateBusEndpointConfiguration(
            Action<IReceiveEndpointConfigurator> configure)
        {
            return _busConfiguration.HostConfiguration
                .CreateReceiveEndpointConfiguration(
                    _settings,
                    _busConfiguration.BusEndpointConfiguration,
                    configure);
        }
    
        public override IEnumerable<ValidationResult> Validate()
        {
            foreach (var result in base.Validate())
                yield return result;
    
            if (string.IsNullOrWhiteSpace(_settings.QueueName))
                yield return this.Failure(
                    "Bus",
                    "The bus queue name must not be null or empty");
        }
    
        ...
    }
    
  • Specification служит мостом между этапом конфигурации и runtime’ом системы, по сути являясь “единицой поведения pipeline”, которая описывает и реализует конкретное поведение системы. Каждая Specification отвечает за одну задачу: добавление фильтра в pipeline или конфигурацию инфраструктуры брокера

    ConsumerMessageConfigurator, как пример подключения Specification через configurator
    class ConsumerMessageConfigurator :
        IConsumerMessageConfigurator<Batch<TMessage>>
    {
        readonly IBuildPipeConfigurator<ConsumeContext<Batch<TMessage>>> _batchConfigurator;
    
        public ConsumerMessageConfigurator(
            IBuildPipeConfigurator<ConsumeContext<Batch<TMessage>>> batchConfigurator)
        {
            _batchConfigurator = batchConfigurator;
        }
    
        public void AddPipeSpecification(
            IPipeSpecification<ConsumeContext<Batch<TMessage>>> specification)
        {
            _batchConfigurator.AddPipeSpecification(specification);
        }
    }
    
    PipeConfigurator, что отвечает за хранение и управление набором Specification
    public class PipeConfigurator<TContext> :
        IBuildPipeConfigurator<TContext>
        where TContext : class, PipeContext
    {
        readonly List<IPipeSpecification<TContext>> _specifications;
    
        public PipeConfigurator()
        {
            _specifications = new List<IPipeSpecification<TContext>>(4);
        }
    
        public IEnumerable<ValidationResult> Validate()
        {
            return _specifications.Count == 0
                ? Array.Empty<ValidationResult>()
                : _specifications.SelectMany(x => x.Validate());
        }
    
        void IPipeConfigurator<TContext>.AddPipeSpecification(
            IPipeSpecification<TContext> specification)
        {
            if (specification == null)
                throw new ArgumentNullException(nameof(specification));
    
            _specifications.Add(specification);
        }
    
        public IPipe<TContext> Build()
        {
            if (_specifications.Count == 0)
                return Pipe.Empty<TContext>();
    
            var builder = new PipeBuilder<TContext>(_specifications.Count);
    
            var count = _specifications.Count;
            for (var index = 0; index < count; index++)
                _specifications[index].Apply(builder);
    
            return builder.Build();
        }
    }
    
    ConsumerConsumeContextRescuePipeSpecification, что добавляет в pipeline RescueFilter, который перехватывает исключения и перенаправляет выполнение в rescue pipe
    public class ConsumerConsumeContextRescuePipeSpecification<T> :
        ExceptionSpecification,
        IPipeSpecification<ConsumerConsumeContext<T>>
        where T : class
    {
        readonly IPipe<ExceptionConsumerConsumeContext<T>> _rescuePipe;
    
        public ConsumerConsumeContextRescuePipeSpecification(
            IPipe<ExceptionConsumerConsumeContext<T>> rescuePipe)
        {
            _rescuePipe = rescuePipe;
        }
    
        public void Apply(IPipeBuilder<ConsumerConsumeContext<T>> builder)
        {
            builder.AddFilter(
                new RescueFilter<
                    ConsumerConsumeContext<T>,
                    ExceptionConsumerConsumeContext<T>>(
                        _rescuePipe,
                        Filter,
                        (context, ex) => new RescueExceptionConsumerConsumeContext<T>(context, ex)));
        }
    
        public IEnumerable<ValidationResult> Validate()
        {
            if (_rescuePipe == null)
                yield return this.Failure("RescuePipe", "must not be null");
        }
    }
    
  • Validators представляет слой проверки конфигурации pipeline до его сборки в GreenPipes и MassTransit. Проверяет не runtime, а то, можно ли вообще корректно собрать pipeline.

    Как выглядят Validators на примере ConsumerFilterSpecification
    public class ConsumerFilterSpecification<TConsumer, TMessage> :
        IPipeSpecification<ConsumerConsumeContext<TConsumer, TMessage>>
        where TConsumer : class
        where TMessage : class
    {
        readonly IFilter<ConsumerConsumeContext<TConsumer, TMessage>> _filter;
    
        public ConsumerFilterSpecification(
            IFilter<ConsumerConsumeContext<TConsumer>> filter)
        {
            _filter = new ConsumerSplitFilter<TConsumer, TMessage>(filter);
        }
    
        public void Apply(
            IPipeBuilder<ConsumerConsumeContext<TConsumer, TMessage>> builder)
        {
            builder.AddFilter(_filter);
        }
    
        public IEnumerable<ValidationResult> Validate()
        {
            if (_filter == null)
                yield return this.Failure("Filter", "must not be null");
        }
    }
    
  • Observers - система, которая позволяет отслеживать и реагировать на события, происходящие внутри pipeline во время выполнения. Observer “слушает” события и реагирует на них, не влияя на основной поток выполнения Pipeline не только выполняется, но и генерирует события:

  1. фильтр начал работу

  2. фильтр завершился

  3. произошла ошибка

  4. retry начался / завершился

Connectable - это интерфейс, который обеспечивает подписку Observer’ов на события компонентов. Компонент, реализующий IConnectable, позволяет регистрировать Observer’ов и уведомлять их о событиях.

По сути, Connectable - это механизм, который связывает Observer’ов с компонентами pipeline’а, обеспечивая отправку событий всем заинтересованным наблюдателям.

Connectable<T> для подключения наблюдателей
public class Connectable<T>
    where T : class
{
    readonly Dictionary<long, T> _connections;
    T[] _connected;
    long _nextId;

    public Connectable()
    {
        _connections = new Dictionary<long, T>();
        _connected = Array.Empty<T>();
    }

    public int Count => _connected.Length;

    public ConnectHandle Connect(T connection)
    {
        if (connection == null)
            throw new ArgumentNullException(nameof(connection));

        var id = Interlocked.Increment(ref _nextId);

        lock (_connections)
        {
            _connections.Add(id, connection);
            _connected = _connections.Values.ToArray();
        }

        return new Handle(id, this);
    }

    public Task ForEachAsync(Func<T, Task> callback)
    {
        if (callback == null)
            throw new ArgumentNullException(nameof(callback));

        T[] connected;
        lock (_connections)
            connected = _connected;

        if (connected.Length == 0)
            return Task.CompletedTask;

        if (connected.Length == 1)
            return callback(connected[0]);

        var outputTasks = new Task[connected.Length];

        for (var i = 0; i < connected.Length; i++)
            outputTasks[i] = callback(connected[i]);

        return Task.WhenAll(outputTasks);
    }

    public void ForEach(Action<T> callback)
    {
        T[] connected;
        lock (_connections)
            connected = _connected;

        switch (connected.Length)
        {
            case 0:
                break;
            case 1:
                callback(connected[0]);
                break;
            default:
                for (var i = 0; i < connected.Length; i++)
                    callback(connected[i]);
                break;
        }
    }

    public bool All(Func<T, bool> callback)
    {
        T[] connected;
        lock (_connections)
            connected = _connected;

        if (connected.Length == 0)
            return true;

        if (connected.Length == 1)
            return callback(connected[0]);

        for (var i = 0; i < connected.Length; i++)
        {
            if (!callback(connected[i]))
                return false;
        }

        return true;
    }

    void Disconnect(long id)
    {
        lock (_connections)
        {
            _connections.Remove(id);
            _connected = _connections.Values.ToArray();
        }
    }

    class Handle : ConnectHandle
    {
        readonly Connectable<T> _connectable;
        readonly long _id;

        public Handle(long id, Connectable<T> connectable)
        {
            _id = id;
            _connectable = connectable;
        }

        public void Disconnect()
        {
            _connectable.Disconnect(_id);
        }

        public void Dispose()
        {
            Disconnect();
        }
    }
}
Уведомление наших наблюдателей с помощью ReceiveObservable
public class ReceiveObservable :
    Connectable<IReceiveObserver>,
    IReceiveObserver
{
    public Task PreReceive(ReceiveContext context)
        => ForEachAsync(x => x.PreReceive(context));

    public Task PostReceive(ReceiveContext context)
        => ForEachAsync(x => x.PostReceive(context));

    public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType)
        where T : class
        => ForEachAsync(x => x.PostConsume(context, duration, consumerType));

    public Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception)
        where T : class
        => ForEachAsync(x => x.ConsumeFault(context, duration, consumerType, exception));

    public Task ReceiveFault(ReceiveContext context, Exception exception)
        => ForEachAsync(x => x.ReceiveFault(context, exception));
}
PerformanceCounterReceiveObserver - реакция на события наблюдателей в лице сбора метрик
public class PerformanceCounterReceiveObserver :
    IReceiveObserver
{
    readonly ICounterFactory _factory;

    public PerformanceCounterReceiveObserver(ICounterFactory factory)
    {
        _factory = factory;
    }

    Task IReceiveObserver.PreReceive(ReceiveContext context)
        => Task.CompletedTask;

    Task IReceiveObserver.PostReceive(ReceiveContext context)
        => Task.CompletedTask;

    Task IReceiveObserver.PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType)
    {
        ConsumerPerformanceCounterCache
            .GetCounter(_factory, consumerType)
            .Consumed(duration);

        MessagePerformanceCounterCache<T>
            .Counter(_factory)
            .Consumed(duration);

        return Task.CompletedTask;
    }

    Task IReceiveObserver.ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception)
    {
        ConsumerPerformanceCounterCache
            .GetCounter(_factory, consumerType)
            .Faulted();

        MessagePerformanceCounterCache<T>
            .Counter(_factory)
            .ConsumeFaulted(duration);

        return Task.CompletedTask;
    }

    Task IReceiveObserver.ReceiveFault(ReceiveContext context, Exception exception)
        => Task.CompletedTask;
}

Переход к внутренней архитектуре MassTransit

Мы рассмотрели базовые принципы, на которых строится pipeline на примере GreenPipes. Теперь разберём, как эти механизмы используются внутри MassTransit и формируют его поведение на уровне обработки сообщений.

Разбор будет начинаться с момента регистрации компонентов и будет доходить до их фактического выполнения. В качестве основы используется версия библиотеки 8.0.0

Архитектура DI-регистрации в MassTransit

Ключевую роль в регистрации зависимостей играет ServiceCollectionBusConfigurator. От него строится вся дальнейшая конфигурация, включая RegistrationConfigurator и DependencyInjectionContainerRegistrar. Дополнительно используются extension-методы, которые расширяют и структурируют процесс регистрации.

Эти компоненты формируют основу DI-интеграции в MassTransit, и именно на них стоит сосредоточить внимание.

  • DependencyInjectionContainerRegistrar реализует интерфейс IContainerRegistrar - абстракцию, которая раньше позволяла MassTransit интегрироваться с разными DI‑контейнерами (Autofac, Lamar, Ninject и др.). Со временем стало очевидно, что поддерживать множество контейнеров слишком затратно, поэтому MassTransit перешёл на единую модель и оставил полноценную интеграцию только с Microsoft DI. В результате DependencyInjectionContainerRegistrar стал основной реализацией, содержащей всю логику регистрации MassTransit‑специфичных зависимостей и их извлечения из IServiceCollection в соответствии с правилами Microsoft DI. пример с AutoFac

DependencyInjectionContainerRegistrar
public class DependencyInjectionContainerRegistrar : IContainerRegistrar
    {
 
        protected readonly IServiceCollection Collection;

        public DependencyInjectionContainerRegistrar(IServiceCollection collection)
        {
            Collection = collection;
        }

  
        public void RegisterRequestClient<T>(RequestTimeout timeout)
            where T : class
        {
            Collection.AddScoped(provider =>
            {
                var clientFactory = GetClientFactory(provider);
                var consumeContext = provider.GetRequiredService<ScopedConsumeContextProvider>().GetContext();

                if (consumeContext != null)
                    return clientFactory.CreateRequestClient<T>(consumeContext, timeout);

                return new ClientFactory(
                        new ScopedClientFactoryContext<IServiceProvider>(clientFactory, provider))
                    .CreateRequestClient<T>(timeout);
            });
        }

        public void RegisterRequestClient<T>(Uri destinationAddress, RequestTimeout timeout)
            where T : class
        {
            Collection.AddScoped(provider =>
            {
                var clientFactory = GetClientFactory(provider);
                var consumeContext = provider.GetRequiredService<ScopedConsumeContextProvider>().GetContext();

                if (consumeContext != null)
                    return clientFactory.CreateRequestClient<T>(consumeContext, destinationAddress, timeout);

                return new ClientFactory(
                        new ScopedClientFactoryContext<IServiceProvider>(clientFactory, provider))
                    .CreateRequestClient<T>(destinationAddress, timeout);
            });
        }
...
}
  • RegistrationConfigurator как фасад DI-конфигурации

RegistrationConfigurator - это основной класс для регистрации компонентов в MassTransit. Через него добавляются consumers, sagas, saga state machines, activities, endpoints и другие элементы системы, формируя единый декларативный слой конфигурации поверх Microsoft.Extensions.DependencyInjection.

RegistrationConfigurator
public class RegistrationConfigurator :
        IRegistrationConfigurator
    {
        readonly IServiceCollection _collection;
        bool _configured;
        ISagaRepositoryRegistrationProvider _sagaRepositoryRegistrationProvider;

        protected RegistrationConfigurator(IServiceCollection collection, IContainerRegistrar registrar)
        {
            _collection = collection ?? throw new ArgumentNullException(nameof(collection));

            Registrar = registrar ?? new DependencyInjectionContainerRegistrar(collection);

            _sagaRepositoryRegistrationProvider = new SagaRepositoryRegistrationProvider();
        }

        public IContainerRegistrar Registrar { get; }

        protected Func<IServiceProvider, IBus, IClientFactory> ClientFactoryProvider { get; } = BusClientFactoryProvider;

   
        public IConsumerRegistrationConfigurator<T> AddConsumer<T>(Action<IConsumerConfigurator<T>> configure)
            where T : class, IConsumer
        {
            return AddConsumer(null, configure);
        }

        public IConsumerRegistrationConfigurator<T> AddConsumer<T>(Type consumerDefinitionType, Action<IConsumerConfigurator<T>> configure = null)
            where T : class, IConsumer
        {
            var registration = _collection.RegisterConsumer<T>(Registrar, consumerDefinitionType);

            registration.AddConfigureAction(configure);

            return new ConsumerRegistrationConfigurator<T>(this);
        }

        public ISagaRegistrationConfigurator<T> AddSaga<T>(Action<ISagaConfigurator<T>> configure)
            where T : class, ISaga
        {
            return AddSaga(null, configure);
        }
...
}
  • ServiceCollectionBusConfigurator - Composition Root Class MassTransit (Bus + DI интеграция) Является центральным классом, отвечающим за настройку и регистрацию Bus в MassTransit. Он строит весь runtime-слой.

ServiceCollectionBusConfigurator
    public class ServiceCollectionBusConfigurator :
        RegistrationConfigurator,
        IBusRegistrationConfigurator
    {

        public ServiceCollectionBusConfigurator(IServiceCollection collection)
            : this(collection, new DependencyInjectionContainerRegistrar(collection))
        {
            IBusRegistrationContext CreateRegistrationContext(IServiceProvider provider)
            {
                return new BusRegistrationContext(provider, Registrar);
            }

            collection.AddSingleton(provider => ClientFactoryProvider(provider, provider.GetRequiredService<IBus>()));

            collection.AddSingleton(provider => Bind<IBus>.Create(CreateRegistrationContext(provider)));

            collection.AddSingleton(provider => provider.GetRequiredService<Bind<IBus, IBusRegistrationContext>>().Value);

            collection.TryAdd(ServiceDescriptor.Singleton(typeof(IReceiveEndpointDispatcher<>), typeof(ReceiveEndpointDispatcher<>)));

            collection.AddSingleton<IReceiveEndpointDispatcherFactory>(provider =>
            {
                var registrationContext = provider.GetRequiredService<Bind<IBus, IBusRegistrationContext>>().Value;
                var busInstance = provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value;

                return new ReceiveEndpointDispatcherFactory(registrationContext, busInstance);
            });
        }

 
        protected ServiceCollectionBusConfigurator(IServiceCollection collection, IContainerRegistrar registrar)
            : base(collection, registrar)
        {
            AddMassTransitComponents(collection);
        }

        public virtual void AddBus(Func<IBusRegistrationContext, IBusControl> busFactory)
        {
            SetBusFactory(new RegistrationBusFactory(busFactory));
        }

        public virtual void SetBusFactory<T>(T busFactory)
            where T : IRegistrationBusFactory
        {
            if (busFactory == null)
                throw new ArgumentNullException(nameof(busFactory));

            ThrowIfAlreadyConfigured(nameof(SetBusFactory));

            this.AddSingleton(provider => Bind<IBus>.Create(CreateBus(busFactory, provider)));

            this.AddSingleton(provider => provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value);
            this.AddSingleton<IReceiveEndpointConnector>(provider => provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value);
            this.AddSingleton(provider => provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value.BusControl);
            this.AddSingleton(provider => provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value.Bus);

            Registrar.RegisterScopedClientFactory();
        }

Регистрация MassTransit: автоматическая vs ручная конфигурация

Разбирать всю систему регистрации в MassTransit глубоко не имеет большого смысла в рамках этого разбора, так как она включает множество уровней абстракции и внутренних механизмов, а детальное погружение существенно бы затянуло материал.

Например, BusRegistrationContext используется в основном в сценарии ConfigureEndpoints, где MassTransit автоматически генерирует топологию: создаёт очереди, группирует consumers и настраивает их подключение к endpoint’ам.

В то же время при использовании ReceiveEndpoint конфигурация становится полностью ручной - разработчик сам определяет очередь и явно подключает consumers. В этом случае автоматическая логика ConfigureEndpoints не применяется, так как endpoint уже управляется напрямую.

Из-за наличия двух параллельных моделей конфигурации (автоматической и ручной), а также большого количества внутренних правил и исключений, углубляться во все детали в рамках текущего материала нецелесообразно, чтобы не перегружать и не затягивать разбор.

Configuration и Configurators

В MassTransit архитектура разделяет процесс настройки системы на два уровня: Configurator и Configuration.

Configurator относится к уровню построения pipeline обработки сообщений и основан на модели GreenPipes. Он отвечает за формирование цепочки middleware, добавление фильтров и определение порядка выполнения логики обработки. На этом уровне создаётся исполнительная модель в виде IPipe, которая определяет поведение обработки сообщений.

Configuration относится к инфраструктурному уровню и отвечает за создание и настройку транспортной части системы. Он определяет receive endpoint, конфигурацию очередей, exchange, topology и связывает транспорт с обработкой сообщений. На этом уровне формируется сама точка входа для сообщений и её интеграция с брокером.

Таким образом, Configuration отвечает за то, что создаётся в системе (endpoint и транспорт), а Configurator - за то, как выполняется обработка сообщений внутри этого endpoint.

Configurators и configuration-объекты - это build-time сущности, которые используются для построения execution pipeline и state machine. Они, как правило, не участвуют в runtime execution, но могут оставаться в памяти, если удерживаются через GC roots (DI container, замыкания или lifecycle host’a).

Наблюдение за работой управляемой памяти в Visual Studio
Наблюдение за работой управляемой памяти в Visual Studio

Исходя из выше становится понятно, что Configuration и Configurator связаны, но ключевой вопрос - как.

При построении receive endpoint’а в RabbitMqHostConfiguration через CreateRabbitMqReceiveEndpointContext применяются все IReceiveEndpointSpecification. Если подключены Sagas, среди них будет StateMachineSagaConfigurator.

Далее в ApplySpecifications вызывается Configure, который подключает saga к pipeline.

Таким образом, связывание происходит через механизм спецификаций, который соединяет Configuration и Configurator на этапе build-time.

Пример дебага в Visual Studio Code
Пример дебага в Visual Studio Code
ReceiveEndpointConfiguration и StateMachineSagaConfigurator, как пример
 public abstract class ReceiveEndpointConfiguration :
        EndpointConfiguration,
        IReceiveEndpointConfiguration
    {
        readonly Lazy<IConsumePipe> _consumePipe;
        readonly HashSet<IReceiveEndpointDependency> _dependencies;
        readonly IList<string> _lateConfigurationKeys;
        readonly IList<IReceiveEndpointSpecification> _specifications;
        IReceiveEndpoint _receiveEndpoint;

        protected ReceiveEndpointConfiguration(IHostConfiguration hostConfiguration, IEndpointConfiguration endpointConfiguration)
            : base(endpointConfiguration)
        {
            ConfigureConsumeTopology = true;
            PublishFaults = true;

            _consumePipe = new Lazy<IConsumePipe>(() => Consume.Specification.BuildConsumePipe());
            _specifications = new List<IReceiveEndpointSpecification>();
            _lateConfigurationKeys = new List<string>();
            _dependencies = new HashSet<IReceiveEndpointDependency>();

            EndpointObservers = new ReceiveEndpointObservable();
            ReceiveObservers = new ReceiveObservable();
            TransportObservers = new ReceiveTransportObservable();

            ConnectConsumerConfigurationObserver(hostConfiguration.BusConfiguration);
            ConnectSagaConfigurationObserver(hostConfiguration.BusConfiguration);
            ConnectHandlerConfigurationObserver(hostConfiguration.BusConfiguration);
            ConnectActivityConfigurationObserver(hostConfiguration.BusConfiguration);
        }
  ...
RabbitMqReceiveEndpointContext CreateRabbitMqReceiveEndpointContext()
        {
            var builder = new RabbitMqReceiveEndpointBuilder(_hostConfiguration, this);
            ApplySpecifications(builder);
            return builder.CreateReceiveEndpointContext();
        }
...
 protected void ApplySpecifications(IReceiveEndpointBuilder builder)
        {
            for (var i = 0; i < _specifications.Count; i++)
                _specifications[i].Configure(builder);
        }
...
public class StateMachineSagaConfigurator<TInstance> :
        ISagaConfigurator<TInstance>,
        IReceiveEndpointSpecification
        where TInstance : class, SagaStateMachineInstance
    {
...
public void Configure(IReceiveEndpointBuilder builder)
        {
            _connector.ConnectSaga(builder, _repository, _specification);
        }
...
}

Runtime-инфраструктура MassTransit

При запуске приложения MassTransit создает долгоживущую инфраструктуру, которая работает весь lifetime сервиса. Устанавливаются и поддерживаются соединение с RabbitMQ (IConnection) и канал (IModel) для коммуникации с брокером.

Если в приложении зарегистрированы consumer’ы, MassTransit создаёт один или несколько receive endpoints:

  1. Системный для задач шины (например, “DESKTOPDTE2RH7_TestingOnWebAPIWithDI_bus_ib3oyydsm7ymbtaibdxj7iiqyb”) - управляет инфраструктурой MassTransit

  2. Пользовательский (например, “something”) - обрабатывает бизнес-сообщения через привязанный Consumer

Демонстрация дебага регистрации 1ого Consumer'a, рядом создается endpoint для работы с шиной сообщений через VS Code
Демонстрация дебага регистрации 1ого Consumer'a, рядом создается endpoint для работы с шиной сообщений через VS Code

В рамках transport pipeline каждого endpoint выполняется RabbitMqConsumerFilter, который является частью транспортного уровня. Он создаёт RabbitMqBasicConsumer и регистрирует его через BasicConsume в RabbitMQ.Client, подписываясь на очередь. Таким образом формируется связь между RabbitMQ и MassTransit.

RabbitMqConsumerFilter, обрати внимание на var consumer в методе Send
public class RabbitMqConsumerFilter :
        IFilter<ModelContext>
    {
        readonly RabbitMqReceiveEndpointContext _context;
        string _consumerTag;

        public RabbitMqConsumerFilter(RabbitMqReceiveEndpointContext context)
        {
            _context = context;

            _consumerTag = "";
        }

        void IProbeSite.Probe(ProbeContext context)
        {
        }

        async Task IFilter<ModelContext>.Send(ModelContext context, IPipe<ModelContext> next)
        {
            var receiveSettings = context.GetPayload<ReceiveSettings>();

           ** var consumer = new RabbitMqBasicConsumer(context, _context);**

            _consumerTag = await context.BasicConsume(
                receiveSettings.QueueName,
                receiveSettings.NoAck,
                _context.ExclusiveConsumer,
                receiveSettings.ConsumeArguments,
                consumer,
                _consumerTag
            ).ConfigureAwait(false);

            await consumer.Ready.ConfigureAwait(false);

            _context.AddConsumeAgent(consumer);

            await _context.TransportObservers.NotifyReady(_context.InputAddress).ConfigureAwait(false);

            try
            {
                await consumer.Completed.ConfigureAwait(false);
            }
            finally
            {
                RabbitMqDeliveryMetrics metrics = consumer;

                await _context.TransportObservers.NotifyCompleted(_context.InputAddress, metrics).ConfigureAwait(false);

                LogContext.Debug?.Log(
                    "Consumer completed {ConsumerTag}: {DeliveryCount} received, {ConcurrentDeliveryCount} concurrent",
                    metrics.ConsumerTag,
                    metrics.DeliveryCount,
                    metrics.ConcurrentDeliveryCount
                );
            }

            await next.Send(context).ConfigureAwait(false);
        }
    }
RabbitMqModelContext, посмотри на запуск потребления через ChannelExecutor
 public class RabbitMqModelContext :
        ScopePipeContext,
        ModelContext,
        IAsyncDisposable
    {
        readonly CancellationToken _cancellationToken;
        readonly PendingConfirmationCollection _confirmations;
        readonly ConnectionContext _connectionContext;
        readonly ChannelExecutor _executor;
        readonly IModel _model;
        readonly IPublisher _publisher;
...
Task<string> ModelContext.BasicConsume(string queue, bool noAck, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer,
            string consumerTag)
        {
            return _executor.Run(() => _model.BasicConsume(consumer, queue, noAck, consumerTag, false, exclusive, arguments), CancellationToken);
        }
...
}

Каждый endpoint связывает очередь RabbitMQ, транспортный слой и pipeline обработки.

Распределение и обработка сообщений по pipeline’ам происходит уже внутри MassTransit transport layer через ReceivePipeDispatcher и ReceivePipe, которые запускают middleware pipeline и consumer pipeline соответствующего receive endpoint’а.

Ключевые концепции потребления сообщения

Стоит помнить, что обработка сообщения в MassTransit на первый взгляд выглядит единообразной, но фактически делится на два уровня: Transport Layer и Consumer Layer

Transport Layer работает с ReceiveContext, который содержит сырое сообщение и транспортные заголовки (например, routing key). На этом этапе бизнес-модель ещё не сформирована.

Далее сообщение проходит через ReceivePipeLine. В DeserializerFilter происходит ключевой момент: из ReceiveContext формируется ConsumeContext, и обработка переходит в Consumer Layer.

DeserializeFilter, обрати внимание на if(...) в методе Send
public class DeserializeFilter :
        IFilter<ReceiveContext>
    {
        readonly IPipe<ConsumeContext> _output;
        readonly ISerialization _serializers;

        public DeserializeFilter(ISerialization serializers, IPipe<ConsumeContext> output)
        {
            _serializers = serializers;
            _output = output;
        }

        public void Probe(ProbeContext context)
        {
            var scope = context.CreateFilterScope("deserialize");

            _serializers.Probe(scope);
            _output.Probe(scope);
        }

        [DebuggerNonUserCode]
        public async Task Send(ReceiveContext context, IPipe<ReceiveContext> next)
        {
            if (!context.TryGetPayload(out ConsumeContext consumeContext))
                consumeContext = _serializers.GetMessageDeserializer(context.ContentType).Deserialize(context);

            Activity.Current?.AddConsumeContextTags(consumeContext);

            await _output.Send(consumeContext).ConfigureAwait(false);

            await next.Send(context).ConfigureAwait(false);

            await consumeContext.ConsumeCompleted.ConfigureAwait(false);
        }
    }

С этого момента уже работает бизнес-логика - consumers, sagas и ConsumePipeLine.

Pipeline обработки сообщения в MassTransit

Сообщение поступает из RabbitMQ через RabbitMqBasicConsumer в ReceivePipeDispatcher, где создаётся ReceiveContext.

Далее оно проходит transport pipeline (ReceivePipe) с базовыми проверками и обработкой ошибок: Rescue и DeadLetter активируются только при сбоях или невозможности обработки.

Затем DeserializeFilter преобразует сообщение в типизированный ConsumeContext.

Демонстрация работы Pipeline'a потребления сообщений через Visual Studio
Демонстрация работы Pipeline'a потребления сообщений через Visual Studio

ReceivePipeDispatcher - оркестратор входящих сообщений

Сообщение пришло из RabbitMQ. Но просто так его обработать нельзя - нужна строгая последовательность действий. Вот здесь и появляется ReceivePipeDispatcher.

Сначала он получает ReceiveContext - это сообщение в его “сыром” виде.

Затем он уведомляет наблюдателей о событии PreReceive - это точка, где можно подцепить любую глобальную логику мониторинга или аудита, которую предусмотрел Крисс Паттерсон.

Сообщение отправляется в ReceivePipe - дальше в конвейер.

После того как ReceivePipe завершит работу, диспетчер ждёт события ReceiveCompleted, чтобы убедиться, что вся асинхронная обработка закончилась.

Если всё прошло успешно, диспетчер явно говорит транспорту: “Сообщение обработано, можно удалить из очереди”.

И наконец, уведомляет наблюдателей о событии PostReceive - обработка завершена успешно.

ReceivePipeDispatcher
public class ReceivePipeDispatcher :
        IReceivePipeDispatcher
    {
        readonly string _activityName;
        readonly IHostConfiguration _hostConfiguration;
        readonly ReceiveObservable _observers;
        readonly IReceivePipe _receivePipe;

        int _activeDispatchCount;
        long _dispatchCount;
        int _maxConcurrentDispatchCount;
...
public async Task Dispatch(ReceiveContext context, ReceiveLockContext receiveLock = default)
        {
            LogContext.SetCurrentIfNull(_hostConfiguration.ReceiveLogContext);

            var active = StartDispatch();

            StartedActivity? activity = LogContext.IfEnabled(_activityName)?.StartReceiveActivity(context);
            try
            {
                if (_observers.Count > 0)
                    await _observers.PreReceive(context).ConfigureAwait(false);

                if (receiveLock != null)
                    await receiveLock.ValidateLockStatus().ConfigureAwait(false);

                await _receivePipe.Send(context).ConfigureAwait(false);

                await context.ReceiveCompleted.ConfigureAwait(false);

                if (receiveLock != null)
                    await receiveLock.Complete().ConfigureAwait(false);

                if (_observers.Count > 0)
                    await _observers.PostReceive(context).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                if (_observers.Count > 0)
                    await _observers.ReceiveFault(context, ex).ConfigureAwait(false);

                if (receiveLock != null)
                {
                    try
                    {
                        await receiveLock.Faulted(ex).ConfigureAwait(false);
                    }
                    catch (Exception releaseLockException)
                    {
                        throw new AggregateException("ReceiveLock.Faulted threw an exception", releaseLockException, ex);
                    }
                }

                throw;
            }
            finally
            {
                activity?.Stop();

                await active.Complete().ConfigureAwait(false);
            }
        }
...
}

ChannelExecutor

При работе с транспортным уровнем MassTransit и RabbitMQ становится видно, что ChannelExecutor добавлен не случайно.

В основе модели лежит понятие shared protocol state. В случае RabbitMQ.Client таким состоянием является TCP-соединение, внутри которого создаются каналы (IModel / IChannel).

Все каналы используют один общий поток записи (frame writer), поэтому IChannel нельзя безопасно использовать из нескольких потоков одновременно - параллельная запись приводит к нарушению AMQP framing и неконсистентности протокольного состояния. Отсюда соединение может быть закрыто, потеряны незавершенные операции.

Статья с офф.документацией RabbitMQ
Статья с офф.документацией RabbitMQ

https://www.rabbitmq.com/client-libraries/dotnet-api-guide#concurrency

Pipeline отправки сообщения в MassTransit

Процесс публикации начинается с вызова метода Publish - точки входа в pipeline отправки сообщений. Он делегирует выполнение в PublishInternal, где формируется PublishContext и подключаются middleware.

На уровне логики маршрутизации сообщение проходит через SendEndpointProxy и CachedSendEndpoint, которые кэшируют и переиспользуют endpoint’ы для оптимизации производительности. Затем управление попадает в SendEndpoint, где запускается основной send pipeline.

Транспортный уровень представлен RabbitMqSendTransport - он интегрирует MassTransit с RabbitMQ и подготавливает сообщение к отправке. На этом уровне используется ConnectionContextSupervisor для управления жизненным циклом соединений и оборачивания отправки в механизм retry (HostConfigurationRetryExtensions), обеспечивая устойчивость к временным сбоям.

Для отправки сообщения используется в MassTransit тот же ChannelExecutor чтоб не сломать AMQP Framing.

Через PipeContextSupervisor создаётся и передаётся ModelContext, представляющий RabbitMQ канал (IModel). На этапе SendPipe сообщение окончательно преобразуется: сериализуется в байты, формируются IBasicProperties, заголовки и routing key.

Вызов проходит через несколько обёрток ModelContext (SharedModelContext, ScopeModelContext, RabbitMqModelContext), которые управляют областью жизни канала и ресурсами.

Финальный шаг - ImmediatePublisher.Publish, где вызывается BasicPublish через ChannelExecutor, гарантирующий потокобезопасную работу с каналом. Здесь сообщение физически отправляется в RabbitMQ exchange. При необходимости используется механизм publisher confirms для подтверждения доставки от брокера.

Демонстрация Pipeline'a отправки сообщения в RabbitMQ через Visual Studio
Демонстрация Pipeline'a отправки сообщения в RabbitMQ через Visual Studio

SagaStateMachine и AsyncStateMachine

Главный паттерн StateMachine, по сути говоря любая “машина состояний” - это правила и они держатся на 4ех основных компонентах, которые работают как единое целое:

  1. States - Объекты State

  2. Events - Типизированные сообщения

  3. Transitions - явные правила(During/When)

  4. Activities - явные действия

Яркое сравнение можно получить с AsyncStateMachine, где математическая модель одна и та же - конечный автомат.

Исходный код
Исходный код
Конечный автомат AsyncStateMachine взятый с sharplab и задокументированный мною
Конечный автомат AsyncStateMachine взятый с sharplab и задокументированный мною
AsyncStateMachine генерирует в исходном классе приватный запечатанный класс с названием метода
AsyncStateMachine генерирует в исходном классе приватный запечатанный класс с названием метода

На месте States у нас числа состояний т.е 0,1,2… На месте Events там завершение через await На месте Transitions у нас порядок кода где будет меняться состояние На месте Activities тело метода

SagaStateMachine и FSM

SagaStateMachine и любая StateMachine представляет собой FSM т.е Finite State Machine.

У любой state machine всегда есть и будут основные понятия, как State, Event/Input, Transition, Activity/Action

Статья из википедии на английском языке(они более подробные, чем русскоязычные статьи)
Статья из википедии на английском языке(они более подробные, чем русскоязычные статьи)

https://en.wikipedia.org/wiki/Finite-state_machine

И это по сути математическая модель, описывающая систему, которая находится ровно в одном состоянии из конечного множества в любой момент времени. Система меняет состояние (переход) под воздействием событий или входных сигналов. FSM используется для управления логикой программ, ИИ в играх и проектирования схем

Представлять её можно ввиде Графа состояний т.е

  1. Вершины - состояния(Pending/Completed)

  2. Ребра - переходы(OrderPaid)

Допустим Pending – (OrderPaid) – Completed

Ключевая идея: Текущее состояние + Событие → Переход → Новое состояние + Действие

Когда система получает событие или входной сигнал, она может:

  • остаться в текущем состоянии

  • выполнить действие

  • перейти в другое состояние

Так же AsyncStateMachine являются по сути DFA т.е Deterministic Finite Automaton. В каждой ситуации путь только один.

Статья из википедии на английском языке(они более подробные, чем русскоязычные статьи)
Статья из википедии на английском языке(они более подробные, чем русскоязычные статьи)

https://en.wikipedia.org/wiki/Finite-state_machine

Деление Sagas на два слоя внутри MassTransit

Касательно паттерна Сага в экосистеме MassTransit важно понимать архитектурное разделение на два уровня. Если мы посмотрим на исходный код или проект, то увидим папки Sagas и SagaStateMachine.

Смысл этого разделения фундаментален: Первый слой: Sagas - это «обвязка». Он отвечает за подключение паттерна к основному движку MassTransit (GreenPipes), работу с репозиториями и получение сообщений из очереди. Второй слой: SagaStateMachine - это уже сама логика оркестрации, которая реализует собственный внутренний Pipeline внутри оболочки саги.

MassTransit, открытый в Vs Code и демонстрация двух слоев Sagas внутри
MassTransit, открытый в Vs Code и демонстрация двух слоев Sagas внутри

SagaStateMachine

Если углубиться в устройство StateMachine, то мы увидим знакомые элементы архитектуры GreenPipes, но в специфических обёртках:

  • Accessors отвечают за доступ к данным конкретного экземпляра саги(может получить состояние саги, изменить).

  • Activities наши бизнес-шаги, аналогичные фильтрам (Filters) в стандартном в стандартном конвейере.

  • Behaviours управляют порядком выполнения Activities, работая как контейнеры (Pipes). Вместо привычных нам PipeBuilders, сборку всего процесса здесь выполняет компонент Binders.

  • Binders - это механизм, который превращает декларативный DSL (When/Then/TransitionTo) в исполняемый pipeline (Behavior).

  • Correlation правило сопоставления сообщения с экземпляром саги, которое определяет, как по данным сообщения найти или создать соответствующую сагу в хранилище.

Внутри StateMachine в MassTransit лежит граф состояний - декларативная модель, описывающая жизненный цикл процесса.

Внутренности SagaStateMachine через Vs Code
Внутренности SagaStateMachine через Vs Code

По сути говоря наша машина состояний здесь и основная фишка MassTransit при работе с Sagas вот в этом замечательном и простом DSL

Простой пример SagaStateMachine с помощью DSL MassTransit
Простой пример SagaStateMachine с помощью DSL MassTransit

States в SagaStateMachine

Если заглянуть внутрь State, то можно заметить следующие Property, а это Enter/Leave/BeforeEnter/AfterLeave

Принцип работы следующий: BeforeEnter → SetState → AfterLeave → Enter

По сути говоря Крисс Паттерсон хотел выполнять логику не только на событии, но и на самом переходе состояния и для этого он сделал lifecycle хуки. И они используются для Transitions т.е переходов состояния.

В распределенных системах критично понимать, когда состояние ещё старое, когда оно уже изменено и в какой момент безопасно сделать side-effects(например, отправку сообщения)

Чуть позже мы увидим переходы этих состояний в TransitionActivity.cs файле

public interface State :
        IVisitable,
        IComparable<State>
    {
        string Name { get; }
       
        Event Enter { get; }

        
        Event Leave { get; }

       
        Event<State> BeforeEnter { get; }

        
        Event<State> AfterLeave { get; }
    }

SagaStateMachine и Graphs

Если вы начнёте изучать исходники MassTransit, почти неизбежно вы придёте к теме саг. И довольно неожиданно можете обнаружить там графы.

На первый взгляд это выглядит странно: зачем машине состояний вообще нужен граф? Но дело в том, что он там не случайно.

Автор проекта, Крис Патерсон, добавил это осознанно. Хотя сама SagaStateMachine отлично работает без какой-либо визуальной модели, в реальных сценариях она может становиться достаточно сложной: количество состояний, событий и переходов растёт, и удерживать всю логику в голове становится затруднительно.

В таких случаях граф выступает как инструмент “второго взгляда” на систему. Он не участвует в выполнении логики, но позволяет визуализировать её - будь то текстовое представление или диаграмма в Mermaid.

По сути, это вспомогательный слой, который делает сложную конечную автоматную логику более прозрачной и удобной для понимания, отладки и коммуникации внутри команды.

Extension класс с демонстрацией зависимостей метода(мой контроллер только там из внешних зависимостей)
Extension класс с демонстрацией зависимостей метода(мой контроллер только там из внешних зависимостей)

Обработка Sagas

Как мы уже знаем, что вначале у нас идет ReceivePipe т.е обработка через транспорт и в данном случае нам он мало интересен.

Ключевой момент тут в том, что у Sagas имеется CorrelationIdMessageFilter внутри ConsumePipe, который создает ProxyContext и прокидывает его дальше по пайплайну, который приводит к CorrelatedSagaFilter.

Старт обработки Sagas с CorrelationIdMessageFilter
Старт обработки Sagas с CorrelationIdMessageFilter
Старт обработки Sagas с ключевого фильтра - CorrelatedSagaFilter, где начало обработки это обращение к Repository с внешним хранилищем в лице БД
Старт обработки Sagas с ключевого фильтра - CorrelatedSagaFilter, где начало обработки это обращение к Repository с внешним хранилищем в лице БД

Здесь можно сказать начало обработки нашей Saga. И начинается обработка с SagaRepository, где если провалится чуть глубже т.е через итерацию - мы попадаем EntityFrameworkSagaRepositoryContextFactory, где у нас стартует транзакция с isolationLevel = ReadCommited.

здесь EntityFrameworkSagaRepositoryContextFactory.cs - фабрика runtime-контекстов поверх EF Core и демонстрация главной транзакционной оберткой WithinTransaction
здесь EntityFrameworkSagaRepositoryContextFactory.cs - фабрика runtime-контекстов поверх EF Core и демонстрация главной транзакционной оберткой WithinTransaction
Отсюда постепенно начинается вход в SagaStateMachine и постепенно запускается основной Pipeline обработки SagaStateMachine у EntityFrameworkSagaRepositoryContextFactory.cs
Отсюда постепенно начинается вход в SagaStateMachine и постепенно запускается основной Pipeline обработки SagaStateMachine у EntityFrameworkSagaRepositoryContextFactory.cs

Поскольку это все PipeLine, то мы вконце-концов сюда после обработки вернемся и будет произведен Commit транзакции либо Rollback

После созданной транзакции, у нас начинается обработка отдельного pipeline с DbContextSagaRepositoryContext с SendSagaPipe

В SendSagaPipe мы сталкиваемся с такой вещью как policy. Если вкратце то это сценарий обработки сообщения в контексте Sagas. В первую очередь проверяется имеется ли запись с данной SagaInstance в БД Saga через DbContextSagaRepository, поскольку включен OptimisticLocking(в нашем примере), то делается запрос с SingleOrDefaultAsync от Ef Core, но ничего не находит т.к мы только инициализируем нашу Sagas…

Как выглядит Optimistic Locking загрузка данных о конкретном экземпляре Saga и этапе её работы из БД
Как выглядит Optimistic Locking загрузка данных о конкретном экземпляре Saga и этапе её работы из БД

Поскольку мы регистрируем новое сообщение, которое ранее не фигурировала в хранилище у БД, то применяется MissingPolicy т.е MissingSagaPipe.

Применение конкретной логики обработки Saga через конкретную Policy - MissingPolicy в SendSagaPipe
Применение конкретной логики обработки Saga через конкретную Policy - MissingPolicy в SendSagaPipe

Мы попадаем в фильтр машины состояний, где у нас меняются названия компонентов(вместо Pipes теперь Behaviours), но это все тот же привычный нам Pipeline в другой обертке. Вызываем ивент с behaviourContext и попадаем в ядро машины состояний Saga т.е MassTransitStateMachine.

StateMachineSagaMessageFilter - фильтр, что запускает механизмы смены состояний у нашей Saga
StateMachineSagaMessageFilter - фильтр, что запускает механизмы смены состояний у нашей Saga

Вконце концов нам надо получить Accessor т.е это тот слой, который говорит где хранится state и как его читать/менять. Мы попадаем InitialIfNullStateAccessor(могут быть и другие, но не о них).

Наше состояние равно null, отсюда надо инициализировать state machine instance до нормального состояния runtime

Здесь мы уже встречаемся с концепции State Machine, а это Activity(в данном случае TransitionActivity). Один из самых важных слоев state machine т.к именно здесь живет исполняемая логика. Activities отвечают за шаги выполнения внутри state machine (behaviour pipeline).

TransitionActivity и метод  выполняют полный цикл перехода состояния в MassTransit: применяют логику Saga (During/When/TransitionTo/BeforeEnter/AfterLeave и т.д.) с учётом текущего состояния из БД или создают и инициализируют новую Saga, если её ещё нет.)
TransitionActivity и метод выполняют полный цикл перехода состояния в MassTransit: применяют логику Saga (During/When/TransitionTo/BeforeEnter/AfterLeave и т.д.) с учётом текущего состояния из БД или создают и инициализируют новую Saga, если её ещё нет.)

Каждая Activity делает Execute вызывая next, либо Faulted обрабатывая ошибки (или прокидывая).

Метод Transition работает с состоянием(ранее обсуждали что у State есть переходы внутренние)

  1. проверка текущего состояния

  2. Leave т.е выход из состояния

  3. BeforeEnter т.е логика перед переходом в новое состояние

  4. Установка нового состояния

  5. After Leave логика после перехода из предыдущего состояния.

  6. Enter - для всей иерархии нового состояния

Каждый TransitionTo не выполняется сразу, а превращается в TransitionActivity.

Выполняется вот этот код конструктора с помощью TransitionActivity.cs
Выполняется вот этот код конструктора с помощью TransitionActivity.cs

Он не просто меняет состояние, а делает целый Lifesycle переход т.е

  1. Создание Saga (Initial transition) null → Initially Когда приходит первое сообщение, то создается Saga, устанавливается начальное состояние Initially(создалась Saga), затем переход в Started

  2. Первый переход состояния Initially → Started Происходит выполнение Then(…), запуск TransitionActivity,смена состояния на Started

  3. Дальнейшие переходы Started → Updated → Processing → Processed Каждый переход выполняется через TransitionActivity проходит полный lifecycle (leave - beforeEnter - enter - afterleave) и вконце концов фиксация состояния в Saga Storage

Все крутится в цикле до тех пор пока не будет завершена логика переходов предусмотренная настройкой MassTransitStateMachine.

И закомичена транзакция.

Основные методы обработки

Когда ты пишешь During(Started, When(SomethingReceived)…, ты фактически добавляешь запись в словарь внутри состояния: для состояния Started и события SomethingReceived создаётся behavior (цепочка из Then, TransitionTo и т.д.). Именно этот behavior потом находится через behaviors.TryGetValue(…) и выполняется через Execute

Initially - это то же самое, только для начального состояния (Initial).

DuringAny - это глобальные обработчики, они не лежат внутри конкретного состояния.

BeforeEnter, Enter, AfterLeave - это обычные события, которые MassTransit сам генерирует при переходах между состояниями.

Когда происходит TransitionTo(Started), внутри вызываются события вроде Started.BeforeEnter, и если ты где-то описал When(Started.BeforeEnter), для него тоже будет создан behavior.

В итоге Execute вызывается только если для текущего события найден behavior. Он может быть либо прямо в текущем состоянии (через During/Initially), либо найден через fallback (например, из DuringAny).

Пример
Пример
Пример
Пример

Разница тут в вызовах заключается в наличии payload

К примеру DuringAny(When(Started.AfterLeave... не имеют payload т.е нет крипежа конкретного Event и запускается переход с этим методом
К примеру DuringAny(When(Started.AfterLeave... не имеют payload т.е нет крипежа конкретного Event и запускается переход с этим методом
К примеру During(Started, When(SomethingReceiveed... имеет типизацию т.е public Event<> SomethingReceived {get; private set;} отсюда запустится метод и впослдествии behavior, собранный из DSL (When / Then / During)
К примеру During(Started, When(SomethingReceiveed... имеет типизацию т.е public Event<> SomethingReceived {get; private set;} отсюда запустится метод и впослдествии behavior, собранный из DSL (When / Then / During)

Особенность Saga как паттерна

Поскольку состояние саги хранится в базе данных, а сообщения поступают через брокер, при обработке каждого сообщения MassTransit загружает сагу по CorrelationId и выполняет соответствующий behavior для текущего состояния.

Если сервис падает до подтверждения обработки (ack), сообщение возвращается в очередь и будет доставлено повторно после перезапуска. При этом MassTransit не восстанавливает промежуточное выполнение внутри behavior, а всегда начинает обработку заново, исходя из последнего зафиксированного состояния саги в базе данных.

При последующих сообщениях с тем же CorrelationId обработка всегда начинается с текущего сохранённого состояния. Если дальнейших переходов не определено (например, достигнут финальный state), сообщение будет проигнорировано или обработано как не имеющее действия.

Таким образом, поведение саги полностью определяется значением CurrentState в базе данных, а не частично выполненными шагами внутри обработки сообщения.

Пример
Пример

Лучший простой пример Sagas Orchestration

Мы увидели подробно реализацию Sagas в MassTransit, она ни капли не простая в реализации, а сама идея простая, отсюда для иллюстрации будет уместным взглянуть как это делается в других фреймворках.

В более легкий показательный пример хочется привести пример Sagas от команды devmentors. Проще говоря это небольшая IT-компания и одновременно образовательный проект для разработчиков из Польши, города Краков.

Взято с офф.сайта DevMentors
Взято с офф.сайта DevMentors

Их библиотека Sagas фигурировала в одном из роликов на ютубе в проекте DNC-DShop, который всем доступен в интернете. Внутри, для демонстрации работы Sagas была написана отдельная библиотека под названием “Chronicle”.

Проект DNC-DShop скачаный из гитхаба по кускам(они так решили раздробить его)
Проект DNC-DShop скачаный из гитхаба по кускам(они так решили раздробить его)
Библиотека Chronicle
Библиотека Chronicle

И чтоб лучше понять как работают саги они сделали библиотеку именно по способу manual control flow engine т.е ты сам контролируешь:

  1. Когда создается Saga

  2. Как ищется Saga

  3. как вызываются handlers

  4. как делаются compensations

Пусть вас не сбивает столку что Chronicle принадлежит некоему snatch.dev github аккаунту, ведь это по сути своей дополнительный аккаунт для либ DevMentors, а так же у них одноименный Github Account, где присутствует та же либа Trill.Saga, где и внутри неё Chronicle так же.

Как работать с их библиотекой я не вижу смысла показывать. Все довольно просто - сообщения в брокер, их десериализация, вызов нужного Event’а Saga. Это та же знакомая оркестрация, только выглядит чутка иначе. Она выполняет функцию Sagas, но меньше обвязок и возможностей из коробки. Трудностей возникнуть не должно.

Почему так же их команда заслуживает внимания? Среди них есть ex-Microsoft MVP - Piotor Gankiewicz. Талантливый разработчик и архитектор.

Что такое MVP
Что такое MVP

Пример на NServiceBus

Ранее мы рассмотрели простой пример реализации Sagas на библиотеке DevMentors с использованием Chronicle, где отсутствуют дополнительные инфраструктурные обвязки, и основной акцент сделан на демонстрации принципов работы. Теперь перейдём к решению enterprise-уровня - NServiceBus, который можно сравнить по значимости и зрелости с MassTransit. При рассмотрении этой библиотеки становится заметно, что она менее “гибкая” и декларативная по сравнению с MassTransit и его SagaStateMachine. В NServiceBus модель Sagas более приземлённая и строгая: разработчик работает ближе к уровню сообщений и явного управления состоянием, без высокоуровневой абстракции state machine, характерной для MassTransit.

Пример
Пример
Пример
Пример
Пример
Пример
Пример
Пример
Пример
Пример

Какой вывод касаемо Sagas?

Saga Orchestration из практики = Message broker + state + correlation + workflow orchestration + compensation + eventual consistency.

  1. Saga работает поверх message broker’a, который является основой распределённого взаимодействия между сервисами.

  2. Используется event-driven или command-driven модель, где events - это события, фиксирующие факт того, что что-то произошло, а commands - это команды, инициирующие выполнение действия т.е CQRS и Sagas связаны

  3. Saga является stateful workflow и хранит состояние бизнес-процесса между шагами выполнения.

  4. Корреляция обеспечивает связь каждого сообщения с конкретной сагой, без чего невозможно определить, какой процесс необходимо продолжить.

  5. Оркестрация процесса заключается в том, что Saga выступает координатором и управляет последовательностью выполнения шагов.

  6. В системе отсутствуют распределённые транзакции, и вместо этого каждый шаг выполняется как отдельная локальная транзакция с моделью eventual consistency.

  7. Компенсация применяется в случае ошибки шага, при этом запускаются заранее определённые компенсирующие действия, реализуемые вручную.

  8. Idempotency означает, что обработчики должны корректно обрабатывать повторные сообщения, для чего используются механизмы optimistic concurrency control, pessimistic locking или concurrency tokens.

  9. Retry и fault handling обеспечивают повторные попытки выполнения при временных ошибках, отправку сообщений в DLQ и переход в compensation при фатальных ошибках.

  10. Persisted workflow state означает, что состояние Saga хранится во внешнем хранилище, обычно в базе данных, для возможности восстановления и продолжения процесса.

    Пример с microservices.io касаемо тем Sagas
    Пример с microservices.io касаемо тем Sagas

Подведем итоги

Как мы увидели, MassTransit представляет собой полноценный фреймворк для построения распределённых систем. Он гибкий, расширяемый и предоставляет все необходимые инструменты для enterprise-разработки. Отдельного внимания заслуживает реализация Sagas. Она выполнена на высоком уровне и выгодно отличается от многих альтернатив благодаря встроенному DSL, который значительно упрощает описание бизнес-процессов. Также в MassTransit присутствует поддержка различных механизмов инфраструктурного уровня: pipeline-фильтров, различных брокеров сообщений и разнообразных persistence-хранилищ. Фактически фреймворк берёт на себя большую часть технической сложности, позволяя разработчику сосредоточиться на бизнес-логике и минимальной конфигурации. Несмотря на то, что начиная с версии 8.0.0 часть функциональности стала платной, MassTransit по-прежнему остаётся одним из наиболее сильных решений для построения распределённых и сложных систем. При этом важно понимать, что ключевые концепции, лежащие в его основе, включая Sagas, опираются на достаточно простые принципы, однако доведены до уровня зрелого и инженерно выверенного решения благодаря работе Криса Патерсона и сообщества.

На этом все, спасибо за внимание :-)

:3
:3

Комментарии (2)


  1. fufkasss Автор
    12.05.2026 09:39

    Статья будет постепенно расширяться, дополняться и улучшаться, пока сохраняется мотивация заниматься этим - а она есть. Поэтому, если вам интересно следить за прогрессом, рекомендую добавить страницу в закладки*.

    Пока в приоритете следующие задачи:

    1. добавить сворачиваемые блоки для кода (чтобы их можно было удобно скрывать и раскрывать для лучшей читаемости);

    2. сделать темы статьи более «звучащими» и структурированными;

    3. расширить примеры и сделать их более конкретными и наглядными.


    1. fufkasss Автор
      12.05.2026 09:39

      Текущий статус:

      По пунктам 1-3 уже внесены первые изменения, однако пока они носят базовый и поверхностный характер.

      Следующий этап:
      - Углубить каждое из направлений: доработать реализацию, повысить качество структуры и сделать примеры более содержательными, детальными и практически полезными.