В этой статье:
Мы откажемся от контроллеров, MediatR‑а и MassTransit‑а, всё вышеперечисленное нам заменит Wolverine. Отольём в граните модульный монолит, имплементируем регистрацию событий используя Marten. Пример всего этого безобразия находится тут.
Начнём с Wolverine
Как утверждают создатели, Wolwerine — это новое поколение реализации паттернов «Медиатор» и «Шина Сообщений», библиотека «с батарейками в комплекте».
Wolverine позволяет:
Обрабатывать HTTP‑запросы с помощью обработчиков Wolverine, находящихся, в том числе, в библиотеках.
Отправлять сообщения (команды и запросы) в обработчики и получать результат выполнения.
Отправлять сообщения (команды и запросы) в обработчики находящиеся в других приложениях, в этом случае нам понадобится транспорт. «Из коробки» Wolverine работает со следующими видами транспорта: RabbitMQ, Azure Service Bus, Amazon SQS, TCP, Sql Server, PostgreSQL, MQTT, Kafka. В примере я использую Kafka в качестве транспорта для некоторых сообщений, Wolverine позволяет выбирать какое сообщение каким транспортом будет отправлено.
Реализовывать паттерн Saga. Сага от Wolverine поддерживает такой тип сообщений, как «сообщения по таймауту», что позволяет в «автоматическом режиме» завершать «забытые» саги.
Отправлять сообщения в режиме «Ping — Pong», в документации это называется «cascading messages».
Wolverine имеет интеграцию с Marten «из коробки», что даёт возможность, например, отправлять все команды и запросы сразу в Кафку.
Marten
Marten ‑.Net библиотека, которая призвана устранить «boilerplate» и позволить Вам сфокусироваться на доставке бизнес ценностей, если верить её создателям. Marten является надстройкой над PostgreSQL, которая позволяет использовать эту БД в качестве аналога MongoDB или EventStoreDB. Также производитель обещает Strong Consistency (но есть нюансы), гибкие стратегии индексирования, улучшенные linq — запросы, встроенную реализацию паттерна Outbox/Inbox «из коробки», поддержку Multi‑tenancy, интеграцию с Asp.Net Core и другие не всем понятные слова на английском:
https://martendb.io/introduction.html.
В примере реализована регистрация событий на Marten, и, если сравнивать связку EventStoreDB плюс MongoDB, (хороший пример находится здесь), то, таки да, в сравнении с ними Marten потребует минимум кода, т.к. read‑часть предоставлена «из коробки».
Mодульный монолит
Модульный монолит — вид архитектуры, подразумевающий разбиение приложения на модули, каждый из которых с увеличением масштаба приложения может быть конвертирован в отдельный микро‑сервис, и, как можно догадаться, модуль является реализацией ограниченного контекста в коде. Модули «знают» лишь контракты друг друга и взаимодействуют так же, как это делали бы микро‑сервисы, плюс взаимодействие с помощью той или иной реализации паттерна Посредник.
Плюсы: мы экономим на написании инфраструктурного кода, необходимого для создания отдельного приложения под каждый модуль, что нам пришлось бы сделать в случае микро‑сервисной архитектуры, в тоже время, есть шанс избежать спагетти‑кода. Мы экономим на написании интеграционного кода, т.к. модули находятся в одном процессе. Сохранять данные из разных модулей в рамках одной транзакции тоже не проблема, т.к. все модули работают с одним и тем же подключением к БД. Создать новый модуль гораздо проще, чем новый микросервис.
Минусы: это монолит, модуль не является единицей развёртывания.
Расово верный подкаст о Modular Monolith для.Net можно найти тут и там же пример.
Ну, или в рамках импортозамещения, беседа и пример от DevBrothers, тоже изрядно.
Предметная область
Она проста и кратка, как тост Булдакова. Есть «Персона», у персоны есть «Счета», на счета приходят «Платежи». Каждое действие: создание персоны, добавление счёта, добавление платежа должно быть подтверждено либо отвергнуто. На выходе мы должны получить список персон и сальдо по каждой из них, с сохранением истории того, как было достигнуто нынешнее состояние.
Добавление новых данных в БД – только по окончанию саги:
В Swagger это выглядит так:
В решении имеется проект API, из которого уделена папка Controllers, она не нужна, так как все HTTP-вызовы должны обрабатываться модулями.
У нас три модуля: Модуль по работе с агрегатом Персона (PersonModule), модуль успешного завершения саг (SagaApprovementModule) и модуль отрицательного завершения саг (SagaRejectionModule).
HTTP EndPoint-ы Wolverine
Начнём с SagaApprovementModule: он содержит библиотеку SagaApprovement.Contracts с контрактами модуля и библиотеку SagaApprovement, в которой находятся эндпоинты Wolverine:
// <summary>
/// EndPoint завершения саги добавления счёта, добавления платежа, создания персоны.
/// Во всех трёх обработчиках один из параметров - впрыск ссылки на объект шины сообщений.
/// </summary>
public static class ApproveEndPoints
{
/// <summary>
/// Отправляем в сагу добавления счёта сообщение разешающее добавление счёта.
/// </summary>
/// <param name="command"></param>
/// <param name="bus"></param>
/// <returns></returns>
[WolverinePost("approve-add-account-saga")]
public static ValueTask Handle(ApproveAccountCommand command, IMessageBus bus) => bus.PublishAsync(new AccountApproved(command.SagaId));
/// <summary>
/// Отправляем в сагу добавления платежа сообщение разешающее добавление платежа.
/// </summary>
/// <param name="command"></param>
/// <param name="bus"></param>
/// <returns></returns>
[WolverinePost("approve-add-payment-saga")]
public static ValueTask Handle(ApprovePaymentCommand command, IMessageBus bus) => bus.PublishAsync(new PaymentApproved(command.SagaId));
/// <summary>
/// Отправляем в сагу создания персоны сообщение разешающее создание персоны.
/// </summary>
/// <param name="command"></param>
/// <param name="bus"></param>
/// <returns></returns>
[WolverinePost("approve-person-creation-saga")]
public static ValueTask Handle(ApprovePersonCreationCommand command, IMessageBus bus) => bus.PublishAsync(new PersonApproved(command.SagaId));
}
В классе находятся три конечные точки с адресами: «approve‑add‑account‑saga», «approve‑add‑payment‑saga», «approve‑person‑creation‑saga». Чтобы всё это работало, необходим класс, имеющий в своём названии «EndPoint» или «EndPoints», и содержащий методы с атрибутами WolverinePost, WolverineGet и т. п. Больше ничего не нужно.
В руководстве Wolverine указано, что предпочтительным является впрыск зависимости в метод. В строке приведённой выше, мы впрыскиваем интерфейс шины сообщений Wolverine. Далее мы можем выбрать один из трёх вариантов:
bus.PublishAsync — публикует сообщение в шине. Даже если нет ни одного обработчика — метод завершится успешно, если обработчики есть — метод не будет ожидать результатов их выполнения.
bus.SendAsync — публикует сообщение в шине, но ожидает, что есть обработчики этого сообщения, если они есть — не дожидается завершения их выполнения.
bus.InvokeAsync — публикует сообщение, ожидает что есть обработчик, возвращает результат выполнения обработчика отправителю.
В примере выше я публикую в шине сообщение AccountApproved. Обработчик этого сообщения находится в саге AddAccountSagа. Чтобы сага принимала сообщение, нужно определить метод Handle, в котором один из параметров является классом сообщения. Обработчик выглядит следующим образом:
/// <summary>
/// Успешное завершение саги, добавляем аккаунт.
/// </summary>
/// <param name="_"></param>
/// <param name="addAccountService">сервис добавления счёта.</param>
public async void Handle(AccountApproved _, IAddAccountService addAccountService)
{
// Обращаемся к сервису добавления аккаунта,
// отправляя туда данные из состояния саги.
await addAccountService.CreateAccount(PersonId, AccountName);
// Завершаем сагу.
MarkCompleted();
}
Saga на Wolverine
/// <summary>
/// Сага добавления аккаунта.
/// </summary>
public class AddAccountSaga : Saga
{
/// <summary>
/// Идентификатор саги.
/// </summary>
public string? Id { get; set; }
/// <summary>
/// Идентификатор персоны.
/// </summary>
public string PersonId { get; set; }
/// <summary>
/// Наименование аккаунта.
/// </summary>
public string AccountName { get; set; }
/// <summary>
/// Обработчик старта саги.
/// Название Start зарезервировано Wolverine.
/// Сообщение принимаемое в этом методе в качетсве первого параметра - будет считаться стартовым
/// сообщением саги.
/// Стартовый обработчик должен вернуть сагу.
/// В нашем случае сагу и сообщение завершающее сагу по таймауту.
/// </summary>
/// <param name="addAccountSagaStarted">Стартовое сообщение саги</param>
/// <returns></returns>
public static (AddAccountSaga, AddAccountTimeoutExpired) Start(AddAccountSagaStarted addAccountSagaStarted) => (new AddAccountSaga
{
//заполняем состояние саги данными.
Id = addAccountSagaStarted.AddAccountSagaId,
PersonId = addAccountSagaStarted.PersonId,
AccountName = addAccountSagaStarted.AccountName
},
new AddAccountTimeoutExpired(addAccountSagaStarted.AddAccountSagaId));
/// <summary>
/// Успешное завершение саги, добавляем аккаунт.
/// </summary>
/// <param name="_"></param>
/// <param name="addAccountService">сервис добавления счёта.</param>
public async void Handle(AccountApproved _, IAddAccountService addAccountService)
{
// Обращаемся к сервису добавления аккаунта,
// отправляя туда данные из состояния саги.
await addAccountService.CreateAccount(PersonId, AccountName);
// Завершаем сагу.
MarkCompleted();
}
/// <summary>
/// Хэндлер отрицательного завершения саги.
/// </summary>
/// <param name="_"></param>
public void Handle(AccountRejected _) => MarkCompleted();
/// <summary>
/// Хэндлер завершения саги по таймауту.
/// MarkCompleted - закрывает сагу.
/// </summary>
/// <param name="_"></param>
public void Handle(AddAccountTimeoutExpired _) => MarkCompleted();
}
Создавать саги с Wolverine просто:
Наследуем класс от класса Saga.
Определяем поля, являющиеся состоянием саги. Состояние будет доступно во всех обработчиках сообщений саги. (стр.6–20).
Определяем обработчик стартового сообщения саги. Он должен иметь название Start, а первый параметр будет считаться сообщением, с которого начинается сага. (стр.31).
Определяем остальные обработчики саги. Первые параметры в обработчиках — это сообщения, которые должна обрабатывать сага.
Метод Start должен вернуть экземпляр саги. В нашем случае он возвращает кортеж из экземпляра саги и сообщения, которое закроет сагу по таймауту, если про неё «забыли». Состояние саги будет сохранено в БД, а затем будет извлекаться из неё при следующих срабатываниях, описанных в саге обработчиков.
Все необходимые сервисы впрыскиваем вторым и следующими параметрами в обработчиках. Как сделано строке № 45.
В настройках, при добавлении Wolverine, можно указать, каким транспортом отправлять сообщения:
//Будем публиковать в кафке ниже приведённые события.
opts.PublishMessage<PersonApproved>().ToKafkaTopic("CreatePersonUseCase.PersonApproved");
opts.PublishMessage<PersonRejected>().ToKafkaTopic("CreatePersonUseCase.PersonRejected");
И указать, откуда могут прийти входящие интеграционные сообщения:
//Будем получать из топиков кафки следующие события.
opts.ListenToKafkaTopic("CreatePersonUseCase.PersonApproved");
opts.ListenToKafkaTopic("CreatePersonUseCase.PersonRejected");
Собственно, это всё, что нужно для создания саги.
Marten и Event Sourcing
Что такое Event Sourcing можно почитать тут, к статье приложен хороший пример, глядя на который, можно оценить на столько Marten облегчает имплементацию регистрации событий.
«Из коробки» в Marten реализован демон и механизм проекций. Проекции обновляются при сохранении новых событий. К проекциям можно обращаться с помощью Linq‑запросов примерно также, как если бы мы работали с реляционными БД. Да‑да, нам не нужен Mongo или другая БД, не нужно имплементировать демон и подписки, чтобы получать снимки. Всё это уже сделал для нас Marten. Нужно создать проекцию и подключить её в Marten, и это не сложно.
Перейдём к матчасти. В примере найдём файл Repository.cs — это реализация репозитория, сохраняющего события в потоки агрегатов в БД и восстанавливающего состояния агрегатов по событиям из БД.
public sealed class Repository(IDocumentStore store) : IRepository
{
//Marten document store
private readonly IDocumentStore store = store;
/// Получаем несохранённые события из агрегата и сохраняем их.
public async Task StoreAsync(Aggregate aggregate, CancellationToken ct = default)
{
// получаем сессию для работы с событиями.
await using var session = await store.LightweightSerializableSessionAsync(token: ct);
// получаем список несохранённых событий из агрегата
var events = aggregate.GetUncommittedEvents().ToArray();
// добавляем события в стрим с идентификатором aggregate.Id
session.Events.Append(aggregate.Id, aggregate.Version, events);
// сохраняем изменения.
await session.SaveChangesAsync(ct);
// очищаем список несохранённых событий.
aggregate.ClearUncommittedEvents();
}
/// Восстанавливаем состояние агрегата по событиям.
public async Task<T> LoadAsync<T>(
string id,
int? version = null,
CancellationToken ct = default
) where T : Aggregate
{
// получаем сессию для работы с событиями.
await using var session = await store.LightweightSerializableSessionAsync(token: ct);
// восстанавливаем состояние агрегата, читая из бд события стрима агрегата.
// при этом Marten вызовет методы Apply для каждого из сохранённых событий.
var stream = await session.Events.FetchForWriting<T>(id, ct);
return stream.Aggregate;
}
}
Это весь код, который необходим для репозитория write-части модуля. В read-части можно обойтись вообще без репозитория.
/// <summary>
/// Endpoint получения данных о персонах.
/// </summary>
public static class GetPersonWithSumEndPoint
{
/// <summary>
/// Получаем персону по её идентификатору.
/// В метод впрыскиваем сессию для получения read-модели.
/// </summary>
/// <param name="getPersonsWithSumCommand"></param>
/// <param name="session"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
[WolverineGet("person/person")]
public static async Task<string> Handle(GetPersonWithSumQuery getPersonsWithSumCommand, IQuerySession session)
{
var person = await session
.Query<PersonWithSum>()
.FirstOrDefaultAsync(c => c.Id == getPersonsWithSumCommand.PersonId) ?? throw new Exception($"Person not found.");
return JsonConvert.SerializeObject(person, Formatting.Indented);
}
/// <summary>
/// Получаем список всех персон (IRL это плохо, но для примера можно кмк.)
/// Впрыскиваем в метод сессию для получения списка read-моделей.
/// </summary>
/// <param name="getPersonsWithSumCommand"></param>
/// <param name="session"></param>
/// <returns>Список персон с сальдо.</returns>
/// <exception cref="Exception"></exception>
[WolverineGet("person/persons")]
public static async Task<string> Handle(GetPersonsWithSumQuery getPersonsWithSumCommand, IQuerySession session)
{
var persons = await session
.Query<PersonWithSum>().ToListAsync() ?? throw new Exception($"Persons not found.");
return JsonConvert.SerializeObject(persons, Formatting.Indented);
}
}
Класс GetPersonWithSumEndPoint, как не трудно догадаться, является HTTP-EndPoint-ом. Чтобы получить данные из проекций, нам достаточно выполнить linq-запрос.
await session.Query<PersonWithSum>().ToListAsync()
Получаем все данные из проекции, но можно добавить Where, Take, Skip и пр. для ограничения выборки. Никакой разницы с выборкой из реляционных БД.
Проекции
Проекции в Marten могут быть следующих видов:
Проекции агрегатов (Aggregate Projection): Live — проецируем на лету, нужна только модель, класс проекции создавать не нужно; Multi‑Stream — проекции с возможностью группировки событий, срезов событий, разбивки по tenantId и пр.; Custom — ещё более широкие возможности, чем в предыдущем Multi‑Stream.
Проекции событий (Event Projections): позволяют явно определять операции создания документов из отдельных событий.
Пользовательские (custom) проекции: проекции, наследуемые от IProjection, всё делаете сами с нуля. Остальные перечисленные в этом списке типы проекций наследуются от тех или иных классов и уже имеют какой‑то функционал.
Inline проекции: события проецируются одной транзакции с сохранением событий.
Flat Table Projection: позволяет создать ADO.NET таблицу, добавить в неё столбцы с помощью AddColumn, и проецировать данные прямо в неё.
В примере можно найти проекцию агрегатов на основе SingleStreamProjection.
Модель проекции выглядит таким образом:
/// <summary>
/// Модель персоны, используется в проекции PersonWithSumProjection. В модель добавлено поле Saldo.
/// </summary>
public class PersonWithSum
{
/// <summary>
/// Идентификатор персоны.
/// </summary>
public string Id { get; set; }
/// <summary>
/// ФИО
/// </summary>
public string Name { get; set; }
/// <summary>
/// ИНН
/// </summary>
public string Inn { get; set; }
/// <summary>
/// Сальдо.
/// </summary>
public decimal Saldo { get; set; }
public long Version { get; private set; }
/// <summary>
/// Счета.
/// </summary>
public List<Account> Accounts = new List<Account>();
/// <summary>
/// Методы Apply будут вызваны Marten при построении проекции.
/// </summary>
/// <param name="event"></param>
public void Apply(PersonCreated @event)
{
Id = @event.Id;
Name = @event.Name;
Inn = @event.Inn;
Version++;
}
public void Apply(PersonNameChanged @event)
{
Name = @event.NewName;
Version++;
}
public void Apply(PersonInnChanged @event)
{
Inn = @event.NewInn;
Version++;
}
public void Apply(AccountCreated @event)
{
var account = new Account(@event.AccountId, @event.Name, new List<Payment>());
Accounts.Add(account);
Version++;
}
public void Apply(PaymentCreated @event)
{
var payment = new Payment(@event.Id, @event.Sum, @event.PaymentType);
var account = Accounts.FirstOrDefault(x => x.Id == @event.AccountId) ?? throw new ArgumentNullException($"Счёт не найден с ид {@event.AccountId}");
account.Payments.Add(payment);
Saldo = @event.PaymentType == (int)PaymentTypeEnum.Credit ? Saldo + @event.Sum : Saldo - @event.Sum;
Version++;
}
}
Методы Apply будут вызваны в проекции.
Класс проекции выглядит так:
/// <summary>
/// Проекция событий агрегата PersonAggreate. Проекция вычисляет сальдо по каждому из агрегатов.
/// </summary>
public class PersonWithSumProjection : SingleStreamProjection<PersonWithSum>
{
public PersonWithSumProjection()
{
// Вызываются методы Apply модели PersonWithSum
ProjectEvent<PersonCreated>((item, @event) => item.Apply(@event));
ProjectEvent<PersonInnChanged>((item, @event) => item.Apply(@event));
ProjectEvent<PersonNameChanged>((item, @event) => item.Apply(@event));
ProjectEvent<AccountCreated>((item, @event) => item.Apply(@event));
// В этом Apply вычисляется сальдо.
ProjectEvent<PaymentCreated>((item, @event) => item.Apply(@event));
}
}
В проекции мы указываем, какие именно события она будет обрабатывать и какими методами модели.
Затем проекцию нужно добавить в Мартен.
options.Projections.Add<PersonWithSumProjection>(ProjectionLifecycle.Async);
LifeTime указан асинхронный, то есть проекция строится асинхронно, после сохранения событий. Можно указать Inline, тогда проекция будет строиться в рамках одной транзакции вместе с сохранением событий.
Агрегаты
Marten не предоставляет базовых классов для написания агрегатов. Поэтому пишем сами: наружу выставляем свойства для чтения. Изменение состояния — только через методы агрегата. Если возникла ошибка — записываем соответствующее событие в список подлежащих сохранению событий, вместо генерации исключения.
Marten ожидает, что в Вашем агрегате реализованы методы Apply для каждого из доменных событий, если это так, то Marten сможет восстановить состояние агрегата, даже если Apply являются приватными методами. Например, в агрегате описаны следующие методы Apply:
protected void Apply(PersonNameChanged @event)
{
Name = @event.NewName;
Version++;
}
protected void Apply(PersonInnChanged @event)
{
Inn = @event.NewInn;
Version++;
}
protected void Apply(AccountCreated @event)
{
var account = Account.Create(@event.AccountId, @event.Name);
_accounts.Add(account);
Version++;
}
Когда мы читаем агрегат из базы с помощью метода LoadAsync репозитория,
/// Восстанавливаем состояние агрегата по событиям.
public async Task<T> LoadAsync<T>(
string id,
int? version = null,
CancellationToken ct = default
) where T : Aggregate
{
// получаем сессию для работы с событиями.
await using var session = await store.LightweightSerializableSessionAsync(token: ct);
// восстанавливаем состояние агрегата, читая из бд события стрима агрегата.
// при этом Marten вызовет методы Apply для каждого из сохранённых событий.
var stream = await session.Events.FetchForWriting<T>(id, ct);
return stream.Aggregate;
}
в строке 15 мы обращаемся к свойству stream.Aggregate, при обращении к нему Marten вызовет выше упомянутые методы Apply, и применит события.
Пример агрегата находится в файле: PersonAggregate.cs.
Подписки
Подписки тоже имеются в Marten.
/// <summary>
/// Подписка на события типа PersonApproved.
/// </summary>
public class PersonApprovedToKafkaSubscription : SubscriptionBase
{
private readonly IServiceProvider _serviceProvider;
public PersonApprovedToKafkaSubscription(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
SubscriptionName = nameof(PersonApprovedToKafkaSubscription);
// Подписываемся только на события типа PersonApproved
IncludeType<PersonApproved>();
// настраиваем сколько событий демон будет извлекать за раз
// и сколько будет держать в памяти.
Options.BatchSize = 1000;
Options.MaximumHopperSize = 10000;
// Позиция с которой читаем события (с текущего события)
Options.SubscribeFromPresent();
}
/// <summary>
/// Обрабатываем события.
/// </summary>
public override async Task<IChangeListener> ProcessEventsAsync(
EventRange page,
ISubscriptionController controller,
IDocumentOperations operations,
CancellationToken cancellationToken)
{
// с помощью Woverine будем отправлять интеграционные события в кафку.
var messageBus = _serviceProvider.GetService<IMessageBus>() ?? throw new ArgumentNullException("Шина событий не зарегистрирована в IoC");
foreach (var @event in page.Events)
{
await messageBus.PublishAsync(
new PersonApprovedIntegrationEvent(@event.Data.GetType().Name, JsonConvert.SerializeObject(@event.Data)));
}
return NullChangeListener.Instance;
}
}
Это подписка на один тип событий. После получения доменного события PersonApproved, на его основе формируется интеграционное событие PersonApprovedIntegrationEvent, которое отправляется в Кафку, как пример.
Replay событий
Иногда нужно заново построить проекцию, в Marten тоже можно это делать.
[HttpGet]
[Route("replay")]
public async Task Replay(
[FromServices] IDocumentStore store, CancellationToken cancellation)
{
using var daemon = await store.BuildProjectionDaemonAsync();
// Fire up everything!
await daemon.StartAllAsync();
// or instead, rebuild a single projection
//await daemon.RebuildProjectionAsync("a projection name", 5.Minutes(), cancellation);
// or a single projection by its type
await daemon.RebuildProjectionAsync<PersonWithSumProjection>(cancellation);
// Be careful with this. Wait until the async daemon has completely
// caught up with the currently known high water mark
await daemon.WaitForNonStaleData(1.Minutes());
// Start a single projection shard
//await daemon.StartAgentAsync("shard name", cancellation);
// Or change your mind and stop the shard you just started
//await daemon.StopAgentAsync("shard name");
// No, shut them all down!
await daemon.StopAllAsync();
}
Итого
В целом, Wolverine+Marten способны значительно уменьшить объём шаблонного кода за счёт широкой гаммы решений «из коробки». Однако, есть нюанс — если вы стремитесь к Strong Consistency — у решений на базе Marten могут возникать проблемы с производительностью, о чём говорят предупреждения на официальном сайте.
Удачи!
granit1986
Делал сагу на MassTransit. Достаточно многословно, но, как по мне - больше возможностей. Как минимум не увидел возможности делать компенсации.
a256fdca
Этот хэндлер по идее сработает при неудачном завершении саги. Теоретически, можно же вызвать отсюда какое-то удаление или откат данных?
granit1986
Как по мне - не очень удобно делать оркестрацию. У меня разбито по шагам и на каждом шаге можно задать данные для компенсации и цепочка просто размотается в обратном порядке и там уже есть нужные данные для возврата