Ревизия в Додо не бумажная: у ревизора есть планшет, где ревизор отмечает все продукты и создает отчеты. Но до 2020 года в пиццериях ревизия проводилась именно на бумажках — просто потому что так было проще и легче. Это, конечно, приводило к недостоверным данным, ошибкам и потерям, — люди ошибаются, бумажки теряются, а ещё их много. Мы решили исправить эту проблему и улучшить планшетный способ. В реализации решили использовать DDD. Как у нас это получилось, расскажем дальше.
Сначала кратко о бизнес-процессах, чтобы понять контекст. Рассмотрим схему движения продуктов, и где в ней ревизии, а затем перейдем к техническим деталям, которых будет много.
Схема движения продуктов и зачем нужна ревизия
В нашей сети больше 600 пиццерий (и это количество будет расти). В каждой из них ежедневно происходит движение сырья: от приготовления и продажи продуктов, списаний ингредиентов по сроку годности, до перемещений сырья в другие пиццерии сети. На балансе пиццерии постоянно находятся около 120 позиций, необходимых для производства продуктов, и дополнительно множество расходных, хозяйственных материалов и химии для поддержания чистоты пиццерии. Для всего этого нужен «учёт», чтобы знать, какого сырья с избытком, а какого не хватает.
«Учёт» описывает любое движение сырья в пиццериях. Поставка — это «плюс» на балансе, а списание — «минус». Например, когда мы заказываем пиццу, кассир принимает заказ и передаёт его в обработку. Дальше раскатывают тесто и начиняют ингредиентами, например, сыром, томатным соусом и пепперони. Все эти продукты идут в производство — списываются. Также списание может произойти, когда заканчивается срок годности.
В результате поставок и списаний формируются «складские остатки». Это отчет, в котором отражается сколько сырья на балансе исходя из операций в информационной системе. Всё это «расчетный остаток». Но есть «фактическое значение» — сколько сырья на самом деле сейчас на складе.
Ревизии
Для подсчета фактического значения как раз используются «ревизии» (ещё их называют «инвентаризациями»).
Ревизии помогают точно рассчитывать количество сырья для закупок. Слишком много закупок заморозит оборотные средства, и возрастёт риск списания излишка продуктов, что тоже приводит к потерям. Опасен не только излишек сырья, но и его недостаток — это может привести к остановке производства некоторых продуктов, что приведет к снижению выручки. Ревизии помогают видеть, сколько прибыли недополучает бизнес из-за учтённых и неучтённых потерь сырья, и работать над сокращением издержек.
Ревизии делятся своими данными с учётом для дальнейшей обработки, например, построения отчётов.
Проблемы в процессе проведения ревизий, или Как работали старые ревизии
Ревизии — трудоемкий процесс. Он занимает много времени и состоит из нескольких этапов: подсчет и фиксация остатков сырья, суммирование результатов сырья по зонам хранения, внесение результатов в информационную систему Dodo IS.
Раньше ревизии проводились с помощью ручки и бумажного бланка, на котором был перечень сырья. При ручном суммировании, сверке и перенесении результатов в Dodo IS есть вероятность совершить ошибку. В полной ревизии подсчитывается больше 100 наименований сырья, а сам подсчёт зачастую проводится поздним вечером или ранним утром, от чего концентрация может страдать.
Как решить проблему
Наша команда «Game of Threads» занимается развитием учета в пиццериях. Мы решили запустить проект «планшет ревизора», который упростит проведение ревизий в пиццериях. Всё решили делать в собственной информационной системе Dodo IS, в которой реализованы основные компоненты для ведения учета, поэтому нам не нужны интеграции со сторонними системами. К тому же инструментом смогут пользоваться все страны нашего присутствия, не прибегая к дополнительным интеграциям.
Ещё до начала работы над проектом мы в команде обсуждали желание применить DDD на практике. К счастью, на одном из проектов уже успешно применили этот подход, поэтому у нас был пример, на который можно посмотреть — это проект «касса доставки».
В статье я расскажу о тактических паттернах DDD, которые мы применяли в разработке: агрегатах, командах, доменных эвентах, прикладной службе и интеграции ограниченных контекстов. Стратегические паттерны и основы DDD не будем описывать, иначе статья будет очень длинной. Об этом мы уже рассказывали в материале «Что можно узнать о Domain Driven Design за 10 минут?»
Новая версия ревизий
Перед началом ревизии нужно знать, что конкретно считать. Для этого нам понадобятся шаблоны ревизии. Их настраивает роль «менеджер офиса». Шаблон ревизии — это сущность InventoryTemplate. Она содержит следующие поля:
- идентификатор шаблона;
- идентификатор пиццерии;
- название шаблона;
- категория ревизии: месячная, недельная, дневная;
- единицы измерения;
- зоны хранения и сырьё в этой зоне хранения
Для этой сущности реализован CRUD-функционал и подробно на ней останавливаться не будем.
Как только у ревизора есть список шаблонов, он может начать ревизию. Обычно это происходит, когда пиццерия закрыта. В этот момент заказов нет и сырьё не движется — можно достоверно получить данные по остаткам.
Начиная ревизию ревизор выбирает зону, например холодильник, и идёт считать сырьё там. В холодильнике он видит 5 пачек сыра по 10 кг, вводит в калькулятор 10 кг * 5, нажимает «Ввести ещё». Затем замечает на верхней полке ещё 2 пачки, и нажимает «Добавить». В результате у него есть 2 замера — по 50 и 20 кг.
Замером мы называем введенное количество сырья ревизором в определённой зоне, но не обязательно суммарное. Ревизор может ввести два замера по одному килограмму или просто два килограмма в одном замере — сочетания могут быть любыми. Главное, чтобы самому ревизору было понятно.
Интерфейс калькулятора.
Так, по шагам, ревизор за 1-2 часа считает всё сырьё, а потом завершает ревизию.
Алгоритм действий довольно простой:
- ревизор может начать ревизию;
- ревизор может добавлять замеры в начатой ревизии;
- ревизор может завершить ревизию.
Из этого алгоритма формируются бизнес-требования к системе.
Реализация первой версии агрегата, команды и события предметной области
Сначала определимся с терминами, которые входят в набор тактических шаблонов DDD. К ним мы будем обращаться в этой статье.
Тактические шаблоны DDD
Агрегат — кластер объектов сущностей и значений. Объекты в кластере — единое целое с точки зрения изменения данных. Каждый агрегат имеет корневой элемент, через который происходит обращение к сущностям и значениям. Агрегаты не стоит проектировать слишком большими. Они будут потреблять много памяти, а вероятность успешного завершения транзакции уменьшается.
Граница агрегата — набор объектов, которые должны быть согласованы в рамках одной транзакции: должны быть соблюдены все инварианты в рамках этого кластера.
Инварианты — бизнес-правила, которые не могут быть противоречивыми.
Команда — это какое-то действие над агрегатом. В результате этого действия может быть изменено состояние агрегата, и может быть сгенерировано одно или несколько событий предметной области.
Событие предметной области — это уведомление об изменении состояния агрегата, нужно для обеспечения согласованности. Агрегат обеспечивает транзакционную согласованность: все данные должны быть изменены здесь и сейчас. Итоговая согласованность гарантирует согласованность в конечном счете — данные изменятся, но не здесь и сейчас, а через неопределенный промежуток времени. Этот промежуток зависит от многих факторов: загруженность очередей сообщений, готовность внешних сервисов к обработке этих сообщений, сеть.
Корневой элемент — это сущность с уникальным глобальным идентификатором. Дочерние элементы могут иметь только локальную идентичность в рамках целого агрегата. Они могут ссылаться друг на друга и могут иметь ссылку только на свой корневой элемент.
Команды и события
Опишем бизнес-требование командой. Команды — это просто DTO с описательными полями.
В команде «добавление замера» есть следующие поля:
- значение замера — количество сырья в определенной единице измерения, может быть null, если замер удалили;
- версия — замер можно редактировать, поэтому нужна версия;
- идентификатор сырья;
- единица измерения: кг/г, л/мл, штуки;
- идентификатор зоны хранения.
public sealed class AddMeasurementCommand
{
// ctor
public double? Value { get; }
public int Version { get; }
public UUId MaterialTypeId { get; }
public UUId MeasurementId { get; }
public UnitOfMeasure UnitOfMeasure { get; }
public UUId InventoryZoneId { get; }
}
Также нам понадобится событие, которое получится в результате отработки этих команд. Мы помечаем событие интерфейсом
IPublicInventoryEvent
— понадобится нам для интеграции с внешними потребителями в дальнейшем.В событии «замер» поля такие же, как и в команде «Добавление замера», кроме того, что событие хранит ещё идентификатор агрегата на котором оно произошло и его версию.
public class MeasurementEvent : IPublicInventoryEvent
{
public UUId MaterialTypeId { get; set; }
public double? Value { get; set; }
public UUId MeasurementId { get; set; }
public int MeasurementVersion { get; set; }
public UUId AggregateId { get; set; }
public int Version { get; set; }
public UnitOfMeasure UnitOfMeasure { get; set; }
public UUId InventoryZoneId { get; set; }
}
Когда мы описали команды и события, можем реализовать агрегат
Inventory
.Реализация агрегата Inventory
UML диаграмма агрегата Inventory.
Подход такой: начало ревизии инициирует создание агрегата
Inventory
, для этого мы используем фабричный метод Create
и начинаем ревизию командой StartInventoryCommand
.Каждая команда мутирует состояние агрегата и сохраняет события в списке
changes
, которые как раз и отправятся в хранилище на запись. Также на основе этих изменений будут сгенерированы события для внешнего мира.Когда агрегат
Inventory
был создан, мы можем его восстанавливать на каждый последующий запрос на изменение его состояния.- Изменения (
changes
) хранятся с момента последнего восстановления агрегата.
- Состояние восстанавливается методом
Restore
, который проигрывает все предыдущие события, отсортированные по версии, на текущем экземпляре агрегатаInventory
.
Это реализация идеи
Event Sourcing
в рамках агрегата. О том, как реализовать идею Event Sourcing
в рамках хранилища поговорим немного позже. Есть хорошая иллюстрация из книги Вон Вернона:Состояние агрегата восстанавливается путем применения событий в порядке их появления.
Дальше происходит несколько замеров командой
AddMeasurementCommand
. Ревизия завершается командой FinishInventoryCommand
. Агрегат валидирует своё состояние в мутирующих методах для соблюдения своих инвариантов.Важно отметить, что агрегат
Inventory
версионируется целиком, а также каждый его замер. С замерами сложнее — приходится решать конфликты в методе обработки события When(MeasurementEvent e)
. В коде я приведу только обработку команды AddMeasurementCommand
.public sealed class Inventory : IEquatable<Inventory>
{
private readonly List<IInventoryEvent> _changes = new List<IInventoryEvent>();
private readonly List<InventoryMeasurement> _inventoryMeasurements = new List<InventoryMeasurement>();
internal Inventory(UUId id, int version, UUId unitId, UUId inventoryTemplateId,
UUId startedBy, InventoryState state, DateTime startedAtUtc, DateTime? finishedAtUtc)
: this(id)
{
Version = version;
UnitId = unitId;
InventoryTemplateId = inventoryTemplateId;
StartedBy = startedBy;
State = state;
StartedAtUtc = startedAtUtc;
FinishedAtUtc = finishedAtUtc;
}
private Inventory(UUId id)
{
Id = id;
Version = 0;
State = InventoryState.Unknown;
}
public UUId Id { get; private set; }
public int Version { get; private set; }
public UUId UnitId { get; private set; }
public UUId InventoryTemplateId { get; private set; }
public UUId StartedBy { get; private set; }
public InventoryState State { get; private set; }
public DateTime StartedAtUtc { get; private set; }
public DateTime? FinishedAtUtc { get; private set; }
public ReadOnlyCollection<IInventoryEvent> Changes => _changes.AsReadOnly();
public ReadOnlyCollection<InventoryMeasurement> Measurements => _inventoryMeasurements.AsReadOnly();
public static Inventory Restore(UUId inventoryId, IInventoryEvent[] events)
{
var inventory = new Inventory(inventoryId);
inventory.ReplayEvents(events);
return inventory;
}
public static Inventory Restore(UUId id, int version, UUId unitId, UUId inventoryTemplateId,
UUId startedBy, InventoryState state, DateTime startedAtUtc, DateTime? finishedAtUtc,
InventoryMeasurement[] measurements)
{
var inventory = new Inventory(id, version, unitId, inventoryTemplateId,
startedBy, state, startedAtUtc, finishedAtUtc);
inventory._inventoryMeasurements.AddRange(measurements);
return inventory;
}
public static Inventory Create(UUId inventoryId)
{
if (inventoryId == null)
{
throw new ArgumentNullException(nameof(inventoryId));
}
return new Inventory(inventoryId);
}
public void ReplayEvents(params IInventoryEvent[] events)
{
if (events == null)
{
throw new ArgumentNullException(nameof(events));
}
foreach (var @event in events.OrderBy(e => e.Version))
{
Mutate(@event);
}
}
public void AddMeasurement(AddMeasurementCommand command)
{
if (command == null)
{
throw new ArgumentNullException(nameof(command));
}
Apply(new MeasurementEvent
{
AggregateId = Id,
Version = Version + 1,
UnitId = UnitId,
Value = command.Value,
MeasurementVersion = command.Version,
MaterialTypeId = command.MaterialTypeId,
MeasurementId = command.MeasurementId,
UnitOfMeasure = command.UnitOfMeasure,
InventoryZoneId = command.InventoryZoneId
});
}
private void Apply(IInventoryEvent @event)
{
Mutate(@event);
_changes.Add(@event);
}
private void Mutate(IInventoryEvent @event)
{
When((dynamic) @event);
Version = @event.Version;
}
private void When(MeasurementEvent e)
{
var existMeasurement = _inventoryMeasurements.SingleOrDefault(x => x.MeasurementId == e.MeasurementId);
if (existMeasurement is null)
{
_inventoryMeasurements.Add(new InventoryMeasurement
{
Value = e.Value,
MeasurementId = e.MeasurementId,
MeasurementVersion = e.MeasurementVersion,
PreviousValue = e.PreviousValue,
MaterialTypeId = e.MaterialTypeId,
UserId = e.By,
UnitOfMeasure = e.UnitOfMeasure,
InventoryZoneId = e.InventoryZoneId
});
}
else
{
if (!existMeasurement.Value.HasValue)
{
throw new InventoryInvalidStateException("Change removed measurement");
}
if (existMeasurement.MeasurementVersion == e.MeasurementVersion - 1)
{
existMeasurement.Value = e.Value;
existMeasurement.MeasurementVersion = e.MeasurementVersion;
existMeasurement.UnitOfMeasure = e.UnitOfMeasure;
existMeasurement.InventoryZoneId = e.InventoryZoneId;
}
else if (existMeasurement.MeasurementVersion < e.MeasurementVersion)
{
throw new MeasurementConcurrencyException(Id, e.MeasurementId, e.Value);
}
else if (existMeasurement.MeasurementVersion == e.MeasurementVersion &&
existMeasurement.Value != e.Value)
{
throw new MeasurementConcurrencyException(Id, e.MeasurementId, e.Value);
}
else
{
throw new NotChangeException();
}
}
}
// Equals
// GetHashCode
}
При возникновении события «Произведен замер» проверяется наличие существующего замера с таким идентификатором. Если такого нет — добавляется новый замер.
Если есть — нужны дополнительные проверки:
- нельзя редактировать удаленный замер;
- входящая версия должна быть больше предыдущей.
Если условия выполняются — можем установить новое значение и новую версию для существующего замера. Если версия меньше, то это конфликт. Для этого мы генерируем исключение
MeasurementConcurrencyException
. Если версия совпадает и значения отличаются, то это тоже конфликтная ситуация. Ну и если совпадает как версия, так и значение, то изменений не произошло. Таких ситуаций обычно не возникает.Сущность «замер» содержит точно такие же поля, что и команда «Добавление замера».
public class InventoryMeasurement
{
public UUId MeasurementId { get; set; }
public UUId MaterialTypeId { get; set; }
public UUId UserId { get; set; }
public double? Value { get; set; }
public int MeasurementVersion { get; set; }
public UnitOfMeasure UnitOfMeasure { get; set; }
public UUId InventoryZoneId { get; set; }
}
Использование публичных методов агрегата хорошо демонстрируют Unit-тесты.
[Fact]
public void WhenAddMeasurementAfterStartInventory_ThenInventoryHaveOneMeasurement()
{
var inventoryId = UUId.NewUUId();
var inventory = Domain.Inventories.Entities.Inventory.Create(inventoryId);
var unitId = UUId.NewUUId();
inventory.StartInventory(Create.StartInventoryCommand()
.WithUnitId(unitId)
.Please());
var materialTypeId = UUId.NewUUId();
var measurementId = UUId.NewUUId();
var measurementVersion = 1;
var value = 500;
var cmd = Create.AddMeasurementCommand()
.WithMaterialTypeId(materialTypeId)
.WithMeasurement(measurementId, measurementVersion)
.WithValue(value)
.Please();
inventory.AddMeasurement(cmd);
inventory.Measurements.Should().BeEquivalentTo(new InventoryMeasurement
{
MaterialTypeId = materialTypeId,
MeasurementId = measurementId,
MeasurementVersion = measurementVersion,
Value = value,
UnitOfMeasure = UnitOfMeasure.Quantity
});
}
Собираем всё вместе: команды, события, агрегат Inventory
Жизненный цикл агрегата Inventory при выполнении команды Finish Inventory.
На схеме изображен процесс обработки команды
FinishInventoryCommand
. Перед обработкой необходимо восстановить состояние агрегата Inventory
на момент выполнения команды. Для этого мы загружаем все события, которые были произведены над данным агрегатом, в память и проигрываем их (п. 1). На момент завершения ревизии у нас уже есть следующие события — начало ревизии и добавление трех замеров. Эти события появились в результате обработки команд
StartInventoryCommand
и AddMeasurementCommand
, соответственно. В базе данных каждая строка в таблице содержит идентификатор ревизии, версию и тело самого события.На этом этапе мы выполняем команду
FinishInventoryCommand
(п. 2). Эта команда сначала проверит валидность текущего состояния агрегата — то, что ревизия находится в состоянии InProgress
, а затем породит новое изменение состояния, добавив событие FinishInventoryEvent
в список changes
(п. 3).Когда команда завершится, все изменения сохранятся в базу данных. В результате в базе появится новая строка с событием
FinishInventoryEvent
и последней версией агрегата (п. 4).Тип
Inventory
(ревизия) — агрегат и корневой элемент по отношению к своим вложенным сущностям. Таким образом, тип Inventory
определяет границы агрегата. В границы агрегата входит список сущностей типа Measurement
(замер), и список всех событий, произведенных над агрегатом (changes
).Реализация всей фичи
Под фичей мы понимаем реализацию конкретного бизнес-требования. В нашем примере мы рассмотрим фичу «Добавление замера». Для реализации фичи нам понадобится разобраться с понятием «прикладная служба» (
ApplicationService
).Прикладная служба — непосредственный клиент модели предметной области. Прикладные службы гарантируют транзакции при использовании базы данных ACID, гарантируя атомарное сохранение переходов между состояниями. Кроме того, прикладные службы также решают задачи безопасности.
У нас уже есть агрегат
Inventory
. Для реализации всей фичи целиком воспользуемся прикладной службой. В ней необходимо проверить наличие всех связных сущностей, а также права доступа у пользователя. Только после соблюдения всех условий можно выполнять сохранение текущего состояния агрегата и отправлять события во внешний мир. Для реализации прикладной службы мы используем MediatR
.public class AddMeasurementChangeHandler
: IRequestHandler<AddMeasurementChangeRequest, AddMeasurementChangeResponse>
{
// dependencies
// ctor
public async Task<AddMeasurementChangeResponse> Handle(
AddMeasurementChangeRequest request,
CancellationToken ct)
{
var inventory =
await _inventoryRepository.GetAsync(request.AddMeasurementChange.InventoryId, ct);
if (inventory == null)
{
throw new NotFoundException($"Inventory {request.AddMeasurementChange.InventoryId} is not found");
}
var user = await _usersRepository.GetAsync(request.UserId, ct);
if (user == null)
{
throw new SecurityException();
}
var hasPermissions =
await _authPermissionService.HasPermissionsAsync(request.CountryId, request.Token, inventory.UnitId, ct);
if (!hasPermissions)
{
throw new SecurityException();
}
var unit = await _unitRepository.GetAsync(inventory.UnitId, ct);
if (unit == null)
{
throw new InvalidRequestDataException($"Unit {inventory.UnitId} is not found");
}
var unitOfMeasure =
Enum.Parse<UnitOfMeasure>(request.AddMeasurementChange.MaterialTypeUnitOfMeasure);
var addMeasurementCommand = new AddMeasurementCommand(
request.AddMeasurementChange.Value,
request.AddMeasurementChange.Version,
request.AddMeasurementChange.MaterialTypeId,
request.AddMeasurementChange.Id,
unitOfMeasure,
request.AddMeasurementChange.InventoryZoneId);
inventory.AddMeasurement(addMeasurementCommand);
await HandleAsync(inventory, ct);
return new AddMeasurementChangeResponse(request.AddMeasurementChange.Id, user.Id, user.GetName());
}
private async Task HandleAsync(Domain.Inventories.Entities.Inventory inventory, CancellationToken ct)
{
await _inventoryRepository.AppendEventsAsync(inventory.Changes, ct);
try
{
await _localQueueDataService.Publish(inventory.Changes, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "error occured while handling action");
}
}
}
Event sourcing
Во время реализации мы решили выбрать подход ES по нескольким причинам:
- В Dodo есть примеры успешного использования этого подхода.
- С помощью ES проще разобраться в проблеме во время инцидента — хранятся все действия пользователя.
- Если выбрать традиционный подход, то перейти к ES будет невозможно.
Идея реализации достаточно проста — все новые события, которые появились в результате команд мы складываем в базу данных. Для восстановления агрегата мы получаем все события и проигрываем их на экземпляре. Чтобы не доставать каждый раз большую пачку событий, мы снимаем состояния каждые N событий и проигрываем оставшуюся часть с этого снимка.
internal sealed class InventoryRepository : IInventoryRepository
{
// dependencies
// ctor
static InventoryRepository()
{
EventTypes = typeof(IEvent)
.Assembly.GetTypes().Where(x => typeof(IEvent).IsAssignableFrom(x))
.ToDictionary(t => t.FullName, x => x);
}
public async Task AppendAsync(IReadOnlyCollection<IEvent> events, CancellationToken ct)
{
using (var session = await _dbSessionFactory.OpenAsync())
{
if (events.Count == 0) return;
try
{
foreach (var @event in events)
{
await session.ExecuteAsync(Sql.AppendEvent,
new
{
@event.AggregateId,
@event.Version,
@event.UnitId,
Type = @event.GetType().FullName,
Data = JsonConvert.SerializeObject(@event),
CreatedDateTimeUtc = DateTime.UtcNow
}, cancellationToken: ct);
}
}
catch (MySqlException e)
when (e.Number == (int) MySqlErrorCode.DuplicateKeyEntry)
{
throw new OptimisticConcurrencyException(events.First().AggregateId, "");
}
}
}
public async Task<Domain.Models.Inventory> GetInventoryAsync(
UUId inventoryId,
CancellationToken ct)
{
var events = await GetEventsAsync(inventoryId, 0, ct);
if (events.Any()) return Domain.Models.Inventory.Restore(inventoryId, events);
return null;
}
private async Task<IEvent[]> GetEventsAsync(
UUId id,
int snapshotVersion,
CancellationToken ct)
{
using (var session = await _dbSessionFactory.OpenAsync())
{
var snapshot = await GetInventorySnapshotAsync(session, inventoryId, ct);
var version = snapshot?.Version ?? 0;
var events = await GetEventsAsync(session, inventoryId, version, ct);
if (snapshot != null)
{
snapshot.ReplayEvents(events);
return snapshot;
}
if (events.Any())
{
return Domain.Inventories.Entities.Inventory.Restore(inventoryId, events);
}
return null;
}
}
private async Task<Inventory> GetInventorySnapshotAsync(
IDbSession session,
UUId id,
CancellationToken ct)
{
var record =
await session.QueryFirstOrDefaultAsync<InventoryRecord>(Sql.GetSnapshot, new {AggregateId = id},
cancellationToken: ct);
return record == null ? null : Map(record);
}
private async Task<IInventoryEvent[]> GetEventsAsync(
IDbSession session,
UUId id,
int snapshotVersion,
CancellationToken ct)
{
var rows = await session.QueryAsync<EventRecord>(Sql.GetEvents,
new
{
AggregateId = id,
Version = snapshotVersion
}, cancellationToken: ct);
return rows.Select(Map).ToArray();
}
private static IEvent Map(EventRecord e)
{
var type = EventTypes[e.Type];
return (IEvent) JsonConvert.DeserializeObject(e.Data, type);
}
}
internal class EventRecord
{
public string Type { get; set; }
public string Data { get; set; }
}
После нескольких месяцев эксплуатации мы поняли, что у нас нет большой необходимости хранить все действия пользователя над экземпляром агрегата. Бизнес никак не использует эту информацию. При этом есть накладные расходы на поддержание такого подхода. Оценив все плюсы и минусы мы планируем уйти от ES к традиционному подходу — заменить табличку
Events
на Inventories
и Measurements
.Интеграция с внешними ограниченными контекстами
Так выглядит схема взаимодействия ограниченного контекста
Inventory
с внешним миром.Взаимодействие контекста ревизий с остальными контекстами. На схеме изображены контексты, сервисы и их принадлежность друг другу.
В случае с
Auth
, Inventory
и Datacatalog
на каждый сервис приходится один ограниченный контекст. Монолит выполняет несколько функций, но сейчас нас интересуют только функционал учета в пиццериях. Помимо ревизий, к учету также относится движения сырья в пиццериях: поступления, перемещения, списания.HTTP
Сервис
Inventory
взаимодействует с Auth
по HTTP. Первым делом пользователь сталкивается с Auth
, который предлагает пользователю выбрать одну из доступных ему ролей.- В системе есть роль «ревизор», которую как раз и выбирает пользователь при проведении ревизии.
- Пользователь выбирает пиццерию.
- Перенаправляется на сервис ревизий.
На последнем этапе у пользователя есть токен от
Auth
. Сервис ревизий должен проверить этот токен, поэтому он обращается к Auth
за проверкой. Auth
проверит не истекло ли время жизни токена, его принадлежность владельцу, а также наличие необходимых прав доступа. Если всё хорошо, то Inventory
сохраняет в куки клеймы — идентификатор пользователя, логин, идентификатор пиццерии и устанавливает время жизни кук.Примечание. Как работает сервис
Auth
мы подробнее рассказали в статье «Тонкости авторизации: обзор технологии OAuth 2.0».С остальными сервисами
Inventory
взаимодействует посредством очередей сообщений. В качестве брокера сообщений в компании используется RabbitMQ, а также обвязка над ним — MassTransit.RMQ: потребление событий
Сервис справочников —
Datacatalog
— обеспечит Inventory
всеми необходимыми сущностями: сырьем для учета, странами, подразделениями и пиццериями.Не вдаваясь в подробности инфраструктуры, опишу основную идею потребления событий. На стороне сервиса справочников уже всё готово для публикации событий, рассмотрим на примере сущности сырья.
namespace Dodo.DataCatalog.Contracts.Products.v1
{
public class MaterialType
{
public UUId Id { get; set; }
public int Version { get; set; }
public int CountryId { get; set; }
public UUId DepartmentId { get; set; }
public string Name { get; set; }
public MaterialCategory Category { get; set; }
public UnitOfMeasure BasicUnitOfMeasure { get; set; }
public bool IsRemoved { get; set; }
}
public enum UnitOfMeasure
{
Quantity = 1,
Gram = 5,
Milliliter = 7,
Meter = 8,
}
public enum MaterialCategory
{
Ingredient = 1,
SemiFinishedProduct = 2,
FinishedProduct = 3,
Inventory = 4,
Packaging = 5,
Consumables = 6
}
}
Это сообщение публикуется в
exchange
. Каждый сервис может создать свою связку exchange-queue
для потребления событий.Схема публикации события и его потребление через примитивы RMQ.
В конечном итоге для каждой сущности есть очередь, на которую может подписаться сервис. Остается только сохранить новую версию в базу данных.
public class MaterialTypeConsumer : IConsumer<Dodo.DataCatalog.Contracts.Products.v1.MaterialType>
{
private readonly IMaterialTypeRepository _materialTypeRepository;
public MaterialTypeConsumer(IMaterialTypeRepository materialTypeRepository)
{
_materialTypeRepository = materialTypeRepository;
}
public async Task Consume(ConsumeContext<Dodo.DataCatalog.Contracts.Products.v1.MaterialType> context)
{
var materialType = new AddMaterialType(context.Message.Id,
context.Message.Name,
(int)context.Message.Category,
(int)context.Message.BasicUnitOfMeasure,
context.Message.CountryId,
context.Message.DepartmentId,
context.Message.IsRemoved,
context.Message.Version);
await _materialTypeRepository.SaveAsync(materialType, context.CancellationToken);
}
}
RMQ: публикация событий
Часть монолита, которая отвечает за учёт, потребляет данные
Inventory
для поддержки остального функционала, где требуются данные ревизий. Все события, о которых мы хотим уведомить другие сервисы, мы помечали интерфейсом IPublicInventoryEvent
. Когда происходит событие подобного рода, мы их вычленяем из списка изменений (changes
) и отправляем в очередь на отправку. Для этого используются две таблицы publicqueue
и publicqueue_archive
.Для гарантии доставки сообщений мы используем паттерн, который у нас обычно называют «локальная очередь», подразумевая
Transactional outbox pattern
. Сохранение состояния агрегата Inventory
и отправка событий в локальную очередь происходят в одной транзакции. Как только произошла фиксация транзакции, мы сразу же пытаемся отправить сообщения брокеру.Если сообщение получилось отправить, то оно удаляется из очереди
publicqueue
. Если нет, то будет предпринята попытка отправить сообщение позднее. Далее подписчики монолита и пайплайны данных потребляют сообщения. Таблица publicqueue_archive
вечно хранит данные для удобной переотправки событий, если это в какой-то момент потребуется.internal sealed class BusDataService : IBusDataService
{
private readonly IPublisherControl _publisherControl;
private readonly IPublicQueueRepository _repository;
private readonly EventMapper _eventMapper;
public BusDataService(
IPublicQueueRepository repository,
IPublisherControl publisherControl,
EventMapper eventMapper)
{
_repository = repository;
_publisherControl = publisherControl;
_eventMapper = eventMapper;
}
public async Task ConsumePublicQueueAsync(int batchEventSize, CancellationToken cancellationToken)
{
var events = await _repository.GetAsync(batchEventSize, cancellationToken);
await Publish(events, cancellationToken);
}
public async Task Publish(IEnumerable<IPublicInventoryEvent> events, CancellationToken ct)
{
foreach (var @event in events)
{
var publicQueueEvent = _eventMapper.Map((dynamic) @event);
await _publisherControl.Publish(publicQueueEvent, ct);
await _repository.DeleteAsync(@event, ct);
}
}
}
События монолиту мы отправляем для отчетов. Отчет «потери и избыток» позволяет сравнивать две любые ревизии между собой. Помимо этого, есть важный отчет «складские остатки», о котором уже было сказано ранее.
Зачем отправлять события пайплайну данных? Все также — для отчетов, но только на новых рельсах. Раньше все отчеты жили в монолите, но теперь их выносят. Это разделяет две ответственности — хранение и обработку производственных и аналитических данных: OLTP и OLAP. Это важно как с точки зрения инфраструктуры, так и разработки.
Заключение
Следуя принципам и практикам Domain-Driven Design, нам удалось построить надежную и гибкую систему, которая удовлетворяет бизнес-потребностям пользователей. У нас получился не только достойный продукт, но и хороший код, который легко модифицировать. Надеемся, что и в ваших проектах найдётся место для использования Domain-Driven Design.
Больше информации о DDD вы можете найти в нашем сообществе DDDevotion и на Youtube-канале DDDevotion. Обсудить статью можно в Телеграм в Dodo Engineering chat.
Ordos
Если не секрет, как transactional outbox реализуете чуть более подробно? Я так понял, что сразу после коммита достаёте сообщение и пересылаете в очередь, но что происходит если какие-то сообщения не были отправлены? Доотправляете фоном? Как в этом случае несколько экземпляров приложения синхронизируются между собой, чтобы не обрабатывать одни и те же сообщения несколько раз и при этом сохранять порядок отправки сообщений?
as94 Автор
Все правильно поняли: если сообщение не дошло до брокера, то оно не удаляется из очереди. Обычно в фоне крутится джоба (для упрощения 1 инстанс, иначе их надо синхронизировать на уровне базы данных), которая переотправит такие сообщения. Код потребителей должен быть готов к дубликатам и быть идемпотентным. Помимо этого, вместе с сообщением всегда идет версия объекта. Если потребителю пришло что-то отличное от current version + 1, то он не принимает такие сообщения, возвращая их обратно в очередь. Так мы реализуем коммутативность потребителей.