Какую потенциальную проблему видите в коде?

await _applicationService.Create(application);
await _queue.Publish(new ApplicationCreatedEvent(application));

Сначала создается заявка в БД, после событие о создании отправляется в брокер сообщений(MQ) для оповещения другого сервиса о появлении новой заявки.

Здесь может произойти ошибка в момент отправки события в очередь, когда наша сущность уже создана в БД.
Это приведет к несогласованному состоянию, так как заявка будет хранится только в нашей системе, а другая система не узнает о ней.

Как решить эту проблему, может воспользуемся транзакцией и откатим сохранение, если при отправке события в очередь произойдет ошибка?

await using var transaction = await _unitOfWork.BeginTransaction();

await _applicationRepository.Add(application); // отслеживание заявки для вставки
await _unitOfWork.SaveChanges(); // выполняется запрос на вставку в БД

try
{
    await _queue.Publish(new ApplicationCreatedEvent(application));
}
catch(Exception e)
{
    _logger.LogError(e, "Ошибка при отправке события в очередь");
    await transaction.Rollback();
    return;
}
  
await transaction.Commit();

Тут уже обработали проблему, но не все случаи, тк ошибка может произойти на стороне БД, когда будем коммитить изменения.
Событие в очередь отправили, а у себя не сохранили, тоже не согласованность.

Проблема заключается в том, что эти два действия - сохранение состояния и публикация события, обычно не происходят атомарно.
Поэтому на любом из этих этапов может возникнуть сбой и одно из действий не будет выполнено, что приведет к несогласованному состоянию системы.

Тут нам поможет паттерн Transactional Outbox - как избежать потери сообщений в микросервисной архитектуре.

В качестве решения предлагается атомарно в одной транзакции сохранить изменения сущности и само событие, а вторым шагом выполнять публикацию ранее сохраненного события.
Если сущность и соответствующее ему событие сохраняются в рамках одной транзакции, то это гарантирует, что данные не будут потеряны.

  1. В одной транзакции создаем сущность и событие (в другой таблице), которое должно отправиться в очередь.

await _applicationRepository.Add(application);
await _eventRepository.Add(new ApplicationCreatedEvent(application));
await _unitOfWork.SaveChanges();
  1. Отправкой событий из базы в брокер будет заниматься отдельный процесс - Outbox processor.
    Реализовать можно в том же сервисе в виде фоновой задачи на основе IHostedService (в ASP.NET Core), Cron джобы или развернуть отдельный Worker Service для этого процесса.
    Есть одна проблема - с какой частотой сканировать БД для отправки сообщений в очередь?
    Нужно учитывать такие факторы: как часто сообщения приходят, насколько быстро нужно доставлять сообщения, производительность БД и тп.

Как работает Outbox Processor?

Он берет пачку событий из базы и обрабатывает данные события, отправляя их в очередь, а после удаляет или помечает события как отправленные (ниже пример отправки события).

async Task SendEvent(ApplicationCreatedEvent applicationCreatedEvent)
{
    try
    {
        await _queue.Publish(applicationCreatedEvent);
        await _eventRepository.Delete(applicationCreatedEvent);
    }
    catch(Exception e)
    {
        _logger.LogError(e, "Ошибка при обработке события");
    }
}

Теперь проблема потери сообщений решена, но все ли это или может есть еще какая-то проблема?

Осталась проблема с дублями событий в очереди, тк при удалении события может произойти ошибка, а оно уже было отправлено в очередь.

Для этого нужно реализовать идемпотентную обработку событий - продюсеру необходимо отправлять идентификатор события в сообщении, а подписчику проверять по идентификатору ранее обработанные события.

P.S.:

Если сообщения не настолько критичны, то профукать одну отправку не страшно и можно оставить реализацию как в первом варианте и не пользоваться данным паттерном.

Если нужна более сложная обработка транзакций, то использовать SAGA, чтобы обрабатывать откаты уже выполненных транзакций со сложной цепочкой взаимодействия, хорошо подойдет для распределенных систем.

Комментарии (12)


  1. izibrizi2
    02.02.2025 14:30

    Забыли добавить, что в реальности, при более менее высокой нагрузке, это работать не будет, а при низкой нагрузке, можеть быть и не стоит использовать брокер.


    1. MonkAlex
      02.02.2025 14:30

      Можете добавить деталей про нагрузку? Может есть доклады-статьи какие по теме.

      Потому что я только знаю что крупные компании этим пользуются даже в нагрузочных сценариях, первый раз слышу что "работать не будет".


      1. izibrizi2
        02.02.2025 14:30

        Ну так этим вопросом и нужно было озадачиться и привести в пример кейс, когда у вас база (с вероятностью 99% постгрес) будет повисать в вакуме, для пересбора таблицы с сообщениями, потому что статусы отправки нужно обновлять, а при обновлении статуса постгрес будет создавать новую строку. Проблема с периодичностью отправки: нужно будет делать периодический поск новых событий, что так же будет вносить нагрузку при поиске. Блокировки при повторной отправке сообщения процессором сообщения. Обработка ситуации, когда повторная отправка так же не сработала (брокер сам улегся). И тд итп.

        Чтобы этот механизм отладить, у вас база раз 10 уйдет в глубокий фриз. Может быть на 11й раз получится сделать кучу оптимизаций и переделку стуктуры таблицы журнала.

        Не очень понимаю за что минусы. Статья очевидная, и в очередной раз нет нормальных исследовпний косяков.


        1. MonkAlex
          02.02.2025 14:30

          Я лично не минусил, честно задал вопрос потому что не видел решений кроме аутбокса.

          И то что вы описали - реальность работы с СУБД при любых задачах, не только при аутбоксе. К сожалению (или к счастью) аутбокс сейчас основное(популярное, рабочее) решение по доставке информации во внешнюю систему (не обязательно брокеру) с гарантией "хотя бы раз". Возможно я тут неправ и есть решения лучше - но я не в курсе и поэтому задаю вопросы.

          Статья слишком простая, тут соглашусь полностью, не хватает каких-нибудь хитростей, опыта, оптимизаций.


  1. JolyBell899
    02.02.2025 14:30

    Спасибо за статью, интересный подход к написанию!


  1. NewCtrl01
    02.02.2025 14:30

    Спасибо, даже не задумывался об этом


  1. dryja
    02.02.2025 14:30

    В таком случае, как по мне, лучше добавить триггер на INSERT в таблицу сущностей, в котором писать событие в другую таблицу. А на стороне бэка слушать сервис брокер.
    Ну и при обработке событий не удалять их из таблицы, а помечать статусом "выполнен". Таким образом останутся логи (которые при желании можно чистить периодически).
    Ну и при старте самого сервиса бэка забирать все события в статусе "новый".


  1. readerOfDream
    02.02.2025 14:30

    шёл 2025 год


    1. AlexanderKovalenko
      02.02.2025 14:30

      шел 2025 год. Все родились со всеми знаниями, учиться никому и ничему не надо. Все задачи решаются, как только на них падет взор. Чудесные времена.


      1. readerOfDream
        02.02.2025 14:30

        Очень советовал бы прочитать книги, написанные и годами уже рекомендованные всеми, там очень много ответов на многие задачи. А вот когда вы найдёте задачу, на которую не будет ответа - милости просим написать пост


  1. lair
    02.02.2025 14:30

    Есть одна проблема - с какой частотой сканировать БД для отправки сообщений в очередь?

    Это мелкая проблема. Большая проблема - это как вообще так спроектировать БД, чтобы таблица аутбокса не стала внезапно бутылочным горлышком для всей системы.


    1. cmekoneup Автор
      02.02.2025 14:30

      Это действительно важный момент – если не продумать архитектуру заранее, аутбокс легко может стать бутылочным горлышком. Поэтому стоит учитывать это узкое место и не заостряться на одном подходе.

      В зависимости от нагрузки и требований можно комбинировать разные стратегии: партиционирование, шардинг, оптимизация индексов и асинхронная очистка старых записей.

      Но в некоторых случаях можно вообще отказаться от явного Outbox – например, используя CDC (например, Debezium + Kafka).
      Вместо того чтобы писать события в отдельную таблицу, можно слушать лог изменений БД (WAL в PostgreSQL, binlog в MySQL) и сразу передавать события в Kafka. Это снижает нагрузку на основную базу и устраняет узкое место в виде таблицы аутбокса.

      С вами согласен, поэтому не стоит зацикливаться на одном решении, лучше смотреть на систему в целом и выбирать стратегию в зависимости от контекста.