Привет, Хабр! В общем работаю я значит Архитектором Программных Решений. Мы тут монолиты на микросервисы переводим поэтому я решил для наших разработчиков написать пример проекта с сагой и за одно может оно и вам понадобиться поэтому выложил сюда. Статья будет дополняться по мере поступления вопросов от вас и от наших разработчиков.

Saga - последовательность локальных транзакций.

Существует два способа координации саг:

1) Хореография - когда участники обмениваться сообщениями без централизованной точки управления. По сути команда в которой нет лидера и каждый друг с другом договариваются кто что делает и когда. Самый частый способ что я видел на практике. Хорош тем что проще всего в реализации. Плох тем что у вас появляются жирные микросервисы которые многое знают друг про друга. Плох тем что процесс растекается по микросервисам и его иногда очень сложно понять. А вызывает B который вызывает С и D и потом D опять вызывает A а C вызывает F.

Заказчик говорит, что хочет новую фичу Аналитику. Аналитик пишет ТЗ и говорит Программисту написать код. Программист создает реализацию и говорит Тестеру протестировать его. Если Тестер находит баг, то говорит Программисту починить баг.

2) Оркестр - оркестратор говорит участникам, какие операции нужно выполнить. По сути команда у которой есть лидер, который говорит каждому что ему делать и когда. Хорош тем что у вас набор простых микросервисов которые друг про друга не знают и друг с другом не общаются. Плох тем что у вас появляется очень сложный и жирный оркестратор который знает про всех и все. Классический пример по такому принципу работает BPM Camunda и ее фреймворк Zeebe для саг. Бизнес процессы, которые вы описываете в ней превращаются в саги. Тут Camunda выступает в качестве оркестратора вашей системы.

Заказчик говорит, что хочет новую фичу Менеджеру. Менеджер говорит Аналитику написать ТЗ. Когда ТЗ готово Менеджер говорит Программисту сделать реализацию. Когда реализация готова Менеджер говорит Тестеру проверить реализацию. Если тестер нашел баг, то он говори об этом Менеджеру и Менеджер говори Программисту его починить.

Я делал саги и через MassTransit и через NServiceBus. Тут будет пример простого проекта на MassTransit который я считаю более удобным и лаконичным чем NServiceBus.

MassTransit использует Saga c оркестратором.

Саги MassTransit используются для описания процесса изменения состояния системы для исполнения, которого нужно сделать запрос более чем в один сетевой сервис. Они используются:

  1. Для обеспечения порядка исполнения. Чтобы действие Б выполнилось строго после действия А.

  2. Для компенсации действий. Если действие Б провалилось по какой-то причине, то сага может выполнить компенсирующее запросы для действия А. Например, вернуть деньги на счет.

  3. Для обеспечения консистентной системы. Например, когда нужно изменить состояние микросервиса А и микросервиса Б строго в соответствии друг с другом.

Наша сага будет максимально простой. Без компенсирующих действий. Суть ее логики: У нас есть отдельно микросервис предметов из которого можно взять определенное количество предметов или добавить. Есть микросервис денег откуда можно либо взять, либо добавить определенное количество денег. Мы проработаем простой сценарий где сага сначала берет из микросервиса денег определенную сумму (холдирует ее) и потом забирает из микросервиса предметов определенное количество (уменьшает количество на складе) предметов.

Диаграмма размещения:

Диаграмма последовательности:

Схема запросов:

MoneyMicroservice

Для начала необходимые зависимости можно поставить командами:

Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
Install-Package MassTransit.EntityFramework

Так же необходимо поставить на RabbitMq Delayed Message Exchange

Создаем сообщения запрос и ответ для работы с шиной

//Рекомендуется для контрактов использовать интерфейсы чтобы не было возможности 
//в них прикрутить какую-то логику.
//Хотя уже с новой версией C# где можно реализации и интерфейсам прикреплять 
//подход больше не актуален
public interface IGetMoneyRequest
{
    public Guid OrderId { get; }
}

public interface IGetMoneyResponse
{
    public Guid OrderId { get; }
}

Создаем обработчик этого сообщения

public class GetMoneyConsumer : IConsumer<IGetMoneyRequest>
{
    public Task Consume(ConsumeContext<IGetMoneyRequest> context)
    {
        return context.RespondAsync<IGetMoneyResponse>(new { context.Message.OrderId });
    }
}

Дальше добавляем в стартапе наш обработчик в список обработчиков

builder.Services.AddMassTransit(cfg =>
{
  //Для сообщения AddMoneyRequest будет установлен адрес add-money-request
    cfg.SetKebabCaseEndpointNameFormatter();
  //Чтобы работала обработка запросов надо поставить расширение на RabbitMq rabbitmq_delayed_message_exchange
    cfg.AddDelayedMessageScheduler();
    //Тут регистрируем наши обработчики сообщений
    cfg.AddConsumer<AddMoneyConsumer>();
    cfg.AddConsumer<GetMoneyConsumer>();
  //Настройка подлючения к RabbitMq
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
      //Использовать паттерн OutBox - либо все сообщений одной пачкой сразу отправляются 
      //либо не будет отправлено ни одно из сообщений. 
     //Это нужно, когда вам, например, нужно послать две команды сразу CreateOrder 
      // и SendEmail только при условии, что отправятся оба либо ни одно из них.
        rbfc.UseInMemoryOutbox();
      //Повторные попытки обработать запрос.
        rbfc.UseMessageRetry(r =>
        {
          //Повторять 3 раза каждый раз увеличивая между повторами 
          //интервал на 1 секунду. Начать с интервала в 1 секунду.
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
      //Использовать отложенные сообщения в том числе с помощью них можно 
      //делать таймауты
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
      //Записываем какие сообщения мы слушаем. Вызывать этот метод обязательно
      //иначе обработчики не будут реагировать на сообщения.
        rbfc.ConfigureEndpoints(brc);
    });
}) 
    .AddMassTransitHostedService();

ItemsMicroservice

По сути точно такой же как и предыдущий

public interface IGetItemsRequest
{
    public Guid OrderId { get; }
}

public interface IGetItemsResponse
{
    public Guid OrderId { get; }
}

public class GetItemsConsumer : IConsumer<IGetItemsRequest>
{
    public Task Consume(ConsumeContext<IGetItemsRequest> context)
    {
        return context.RespondAsync<IGetItemsResponse>(new { OrderId = context.Message.OrderId });
    }
}
    
builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.AddDelayedMessageScheduler();
    cfg.AddConsumer<AddItemsConsumer>();
    cfg.AddConsumer<GetItemsConsumer>();
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
        rbfc.UseInMemoryOutbox();
        rbfc.UseMessageRetry(r =>
        {
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        rbfc.ConfigureEndpoints(brc);
    });
})
    .AddMassTransitHostedService();

SagasMicroservice

Тут наши саги находятся, и они оркеструют процесс покупки предметов.

Сообщений запрос и ответ от саги:

public class BuyItemsRequest
{
    public Guid OrderId { get; set; }
}

public class BuyItemsResponse
{
    public Guid OrderId { get; set; }
    public string ErrorMessage { get; set; }
}

Состояние саги:

public sealed class BuyItemsSagaState : SagaStateMachineInstance
{
		//Идентификатор по которому мы отличаем один процесс от другого.
    public Guid CorrelationId { get; set; }
    //Текущее состояние саги ака Failed, GetItemsPending и т.д.
    public string? CurrentState { get; set; }
    //Тут мы сохраняем идентификатор запроса что запустил нашу сагу
    //чтобы ответить на него
    public Guid? RequestId { get; set; }
    //Тут мы сохраняем адрес откуда пришел запрос который запустил нашу сагу
    //чтобы ответить на него
    public Uri? ResponseAddress { get; set; }
}

Сага для процесса покупки предметов:

public sealed class BuyItemsSaga : MassTransitStateMachine<BuyItemsSagaState>
{
    private readonly ILogger<BuyItemsSaga> _logger;

    public BuyItemsSaga(ILogger<BuyItemsSaga> logger)
    {
        _logger = logger;
        //Указываем куда будем записывать текущее состояние саги (Pending,Faulted)
        InstanceState(x => x.CurrentState);
        //Указываем что слушаем событие OrderId у которого равен нашему CorrelationId у саги
        //Либо если нет саги с таким CorrelationId то создаем его с ним.
        Event<BuyItemsRequest>(() => BuyItems, x => x.CorrelateById(y => y.Message.OrderId));
       //Указываем какие запросы будем делать из саги
       Request(
            () => GetMoney
            );
        Request(
         () => GetItems
         );
        //Указываем как будем реагировать на сообщения в стартовом состоянии
        Initially(

            When(BuyItems)
            .Then(x =>
            {
            //Сохраняем идентификатор запроса и его адрес при старте саги чтобы потом на него ответить
                if (!x.TryGetPayload(out SagaConsumeContext<BuyItemsSagaState, BuyItemsRequest> payload))
                    throw new Exception("Unable to retrieve required payload for callback data.");
                x.Saga.RequestId = payload.RequestId;
                x.Saga.ResponseAddress = payload.ResponseAddress;
            })
            //Совершаем запрос к микросевису MoneyMicroservice
            .Request(GetMoney, x => x.Init<IGetMoneyRequest>(new { OrderId = x.Data.OrderId }))
           //Переводим сагу в состояние GetMoney.Pending
           .TransitionTo(GetMoney.Pending)

            );

        //Описываем то как наша сага будет реагировать на сообщения находясь в 
        //состоянии GetMoney.Pending
        During(GetMoney.Pending,
            //Когда приходи сообщение что запрос прошел успешно делаем новый запрос
            //теперь уже в микросервис ItemsMicroservice
            When(GetMoney.Completed)
            .Request(GetItems, x => x.Init<IGetItemsRequest>(new { OrderId = x.Data.OrderId }))
            .TransitionTo(GetItems.Pending),
            //При ошибке отвечаем тому, кто инициировал запрос сообщением с текстом ошибки
            When(GetMoney.Faulted)
              .ThenAsync(async context =>
              { 
                //Тут можно сделать какие-то компенсирующие действия. 
               //Например, вернуть деньги куда-то на счет.
                  await RespondFromSaga(context, "Faulted On Get Money " + string.Join("; ", context.Data.Exceptions.Select(x => x.Message)));
              })
            .TransitionTo(Failed),
            //При таймауте отвечаем с сообщением что произошел таймаут
            When(GetMoney.TimeoutExpired)
               .ThenAsync(async context =>
               {
                   await RespondFromSaga(context, "Timeout Expired On Get Money");
               })
            .TransitionTo(Failed)

             );

        During(GetItems.Pending,
            //При успешном ответе от микросервиса предметов 
            //отвечаем без ошибки и переводим сагу в финальное состояние.
            When(GetItems.Completed)
              .ThenAsync(async context =>
              {
                  await RespondFromSaga(context, null);
              })
            .Finalize(),

            When(GetItems.Faulted)
              .ThenAsync(async context =>
              {
                   //Тут можно сделать какие-то компенсирующие действия. 
                  //Например, вернуть деньги куда-то на счет.
                  await RespondFromSaga(context, "Faulted On Get Items " + string.Join("; ", context.Data.Exceptions.Select(x => x.Message)));
              })
            .TransitionTo(Failed),

            When(GetItems.TimeoutExpired)
               .ThenAsync(async context =>
               {
                   await RespondFromSaga(context, "Timeout Expired On Get Items");
               })
            .TransitionTo(Failed)

            );
    }
    //Запрос на получение денег
    public Request<BuyItemsSagaState, IGetMoneyRequest, IGetMoneyResponse> GetMoney { get; set; }
    //Запрос на получение предметов
    public Request<BuyItemsSagaState, IGetItemsRequest, IGetItemsResponse> GetItems { get; set; }
   //Событие стартующее нашу сагу.
   public Event<BuyItemsRequest> BuyItems { get; set; }
   //Одно из наших кастомных состояний в которое может перейти сага
    public State Failed { get; set; }
    //Метод для ответного сообщения
    //Тут нужно явно использовать ResponseAddress и RequestId 
    //сохраненные ранее чтобы ответить ровно тому, кто сделал запрос
    private static async Task RespondFromSaga<T>(BehaviorContext<BuyItemsSagaState, T> context, string error) where T : class
    {
        var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
        await endpoint.Send(new BuyItemsResponse
        {
            OrderId = context.Saga.CorrelationId,
            ErrorMessage = error
        }, r => r.RequestId = context.Saga.RequestId);
    }
}

Регистрируем сагу в стартапе

builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.AddDelayedMessageScheduler();
    //Тут добавляем сагу с указанием что будем сохранять ее в БД 
    //с помощью EF и будем использовать пессимистичный режим конкуренции за ресурсы
    cfg.AddSagaStateMachine<BuyItemsSaga, BuyItemsSagaState>()
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
        r.ExistingDbContext<SagasDbContext>();
        r.LockStatementProvider = new PostgresLockStatementProvider();
    });
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
        rbfc.UseInMemoryOutbox();
        rbfc.UseMessageRetry(r =>
        {
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        rbfc.ConfigureEndpoints(brc);
    });
});

ApiGateway

Его работа — это просто перевести перевести http запрос в ampq запрос в шину. Дождаться ответа и вернуть ответ пользователю (фронтенду)

public class BuyItemsRequstModel
{
    public Guid OrderId { get; set; }
}

[ApiController]
[Route("api/v1/items")]
public class ItemsController : ControllerBase
{
    //Интерфейс MassTransit через который идет работа с сообщениями
    private readonly IBus _bus;
    private readonly ILogger<ItemsController> logger;

    public ItemsController(IBus bus, ILogger<ItemsController> logger)
    {
        _bus = bus;
        this.logger = logger;
    }

    [HttpPost("buy")]
    public async Task<BuyItemsResponse> BuyAsync(BuyItemsRequstModel model)
    {
       //Делаем запрос в шину и ждем ответа от саги. 
       //Ответ придёт из RabbitMq или словим ошибку таймаута запроса
        logger.LogInformation("Start!");
        var response = await _bus.Request<BuyItemsRequest, BuyItemsResponse>(model);
        logger.LogInformation("End!");
        //Возвращаем сообщение что было в ответе
        return response.Message;
    }
}

Так же можно все делать просто на событиях без Request/Response только это потребует больше кода и можно их через SignalR гонять на фронт.

Исходники

Microservices With Sagas

Благодарности

Спасибо @questor за помощь. Всем добра :)

Комментарии (8)


  1. Cobalt747
    10.05.2022 16:16
    +1

    интересно, а как защититься от ситуации когда в процессе работы

    BuyItemsSaga

    произойдет, к примеру, нехватка памяти и процесс упадет?
    кто обеспечит возврат денег или предметов?


    1. VanquisherWinbringer Автор
      10.05.2022 20:28

      Она упадет, потом сделает ретрай. Там пока в БД не будет записано что обработчик выполнился и выполнено действие оно будет повторяться. Либо до достижения лимита ретраев.

      Тут надо в ней поля добавить для количества предметов и суммы денег что она себе забрала.

      Дальше записываем данные в эти поля и потом уже делаем запрос на возвращение денег и предметов обратно. Да, консистентность со временем.

      Для важных процессов ретраев делается много и с большим нарастающим промежутком между ними. Кроме того, в логи пишется если все ретраи выполнены и процесс не удалось завершить (Fault) и уже такие случаи можно индивидуально рассмотреть. Вообще через Jaeger это все трейситься обычно.

      Тут для простоты фронт ждет ответа. Обычно записывают в БД Id таска при постановке задачи и дальше при событии оповещающем о том что сага завершила свою работу выставляют данные этом Таске в БД и еще в SignalR пушим что таск завершился. Фронт может и сам тоже проверить сделав запрос завершился ли Таск и какой у него результат.


          public class DelayedTask
          {
              public Guid Id { get; set; }
              public bool IsCompleted { get; set; }
              public string ResultJson { get; set; }
          }


    1. VanquisherWinbringer Автор
      10.05.2022 20:31

      И в MassTransit и в NServiceBus в реализации Saga есть перзистентность и возможность повторов выполнения т.е. они умеют после падения востанавливаться и продолжать свою работу там где они остановились.


    1. VanquisherWinbringer Автор
      10.05.2022 20:40

      Тут в принцыпе "пример ради примера" и не стал расписывать компенсирующие действия потому что стала бы сага тогда совсем огромной. Можно повесить Таймаут на сагу и если через 30 минут например не удалось провести операцию то начать пытаться до посинения откатить изменения в системе. Вернуть денги и предметы обрабтно или только деньги. Таймауты тоже настраиваются как перзистентные и тоже настраиваются с ретраями


    1. VanquisherWinbringer Автор
      10.05.2022 20:53

      Подробнее о отложенных сообщениях для Таймаутов можно почитать тут MassTransit Schedule
      Вот пример того как сделать Таймаут для саги

      https://masstransit-project.com/usage/sagas/automatonymous.html#schedule
      
      public interface OrderCompletionTimeoutExpired
      {
          Guid OrderId { get; }
      }
      
      public class OrderState :
          SagaStateMachineInstance
      {
          public Guid CorrelationId { get; set; }
          public string CurrentState { get; set; }
      
          public Guid? OrderCompletionTimeoutTokenId { get; set; }
      }
      
      public class OrderStateMachine :
          MassTransitStateMachine<OrderState>
      {
          public OrderStateMachine()
          {
              Schedule(() => OrderCompletionTimeout, instance => instance.OrderCompletionTimeoutTokenId, s =>
              {
                  s.Delay = TimeSpan.FromDays(30);
      
                  s.Received = r => r.CorrelateById(context => context.Message.OrderId);
              });
              
              During(Accepted,
                  When(OrderCompletionTimeout.Received)
                      .PublishAsync(context => context.Init<OrderCompleted>(new { OrderId = context.Saga.CorrelationId }))
                      .Finalize());
          }
      
          public Schedule<OrderState, OrderCompletionTimeoutExpired> OrderCompletionTimeout { get; private set; }
      }


  1. Truba
    11.05.2022 08:24
    +1

    Спасибо за интересную статью и подробные комментарии в коде!


  1. alexs0ff
    11.05.2022 12:44
    +1

    Активно использую в своих проектах саги из MT.

    В теоретическую часть я бы добавил обобщение саги с позиции микросервисов, и что они бывают хореографическими и оркестрируемыми. MT как раз используют оркестрацию.

    От паттернов Requests отказались по соображением производительности, используем синхронный RPC через http/rest.

    Плюс роутинг MTский по большей части руками настраиваем из-за соображений гетерогенных систем, когда разные микросервисы могут иметь разные языки реализации.


    1. VanquisherWinbringer Автор
      11.05.2022 15:53

      Спасибо за полезный коментарий. Вечером дополню статью.