Недавно в подкасте "Цинковый прод" мы с товарищами обсуждали паттерн CQRS/ES и некоторые особенности её реализации в Elixir. Т.к. я в работе использую Laravel, грех было не покопаться в интернетах и не найти как же можно потягать этот подход в экосистеме данного фреймворка.
Всех приглашаю под кат, постарался максимально тезисно описать тему.
Немножко определений
CQRS (Command Query Responsibility Segregation) — выделение в отдельные сущности операции чтения и записи. Например пишем в мастер, читаем из реплики. CQRS. Факты и заблуждения — поможет досконально познать дзен CQRS.
ES (Event Sourcing) — хранение всех изменений состояния какой-либо сущности или набора сущностей.
CQRS/ES — это архитектурный подход при котором мы сохраняем все события изменения состояния какой либо сущности в таблице событий и добавляем к этому агрегат и проектор.
Агрегат — хранит в памяти свойства, необходимые для принятия решений бизнес логики (для ускорения записи), принимает решения (бизнес логика) и публикует события.
Проектор — слушает события и пишет в отдельные таблицы или базы (для ускорения чтения).
В бой
Laravel event projector — библиотека CQRS/ES для Laravel
Larabank — репозиторий с реализованным CQRS/ES подходом. Его и возьмем на пробу.
Конфигурация библиотеки подскажет куда смотреть и расскажет, что это такое. Смотрим файл event-projector.php. Из необходимого для описания работы:
projectors
— регистрируем проекторы;reactors
— регистрируем реакторы. Реактор — в данной библиотеке добавляет сайд-эффекты в обработку событий, например в этом репозитории, если три раза попытаться превысить лимит снятия средств, то пишется событие MoreMoneyNeeded и отправляется письмо пользователю о его финансовых трудностях;replay_chunk_size
— размер чанка повтора. Одна из фич ES — возможность восстановить историю по событиям. Laravel event projector подготовился к утечке памяти во время такой операции с помощью данной настройки.
Обращаем внимание на миграции. Кроме стандартных Laravel таблиц имеем
stored_events
— основная ES таблица с несколькими колонками неструктурированных данных под мета данные событий, строкой храним типы событий. Важная колонкаaggregate_uuid
— хранит uuid агрегата, для получения всех событий относящихся к нему;accounts
— таблица проектора счетов пользователя, необходима для быстрой отдачи актуальных данных о состоянии баланса;transaction_counts
— таблица проектора количества транзакций пользователя, необходима для быстрой отдачи количества совершенных транзакций.
А теперь предлагаю отправиться в путь вместе с запросом на создание нового счета.
Создание счета
Стандартный resource
роутинг описывает AccountsController. Нас интересует метод store
public function store(Request $request)
{
$newUuid = Str::uuid();
// Обращаемся к агрегату, сообщаем ему uuid событий
// которые в него должны входить
AccountAggregateRoot::retrieve($newUuid)
// Добавляем в массив событий на отправку событие создания нового счета
->createAccount($request->name, auth()->user()->id)
// Отправляем массив событий на отправку в очередь на запись
->persist();
return back();
}
AccountAggregateRoot наследует библиотечный AggregateRoot. Посмторим на методы, которые вызывал контроллер.
// Берем uuid и получаем все его события
public static function retrieve(string $uuid): AggregateRoot
{
$aggregateRoot = (new static());
$aggregateRoot->aggregateUuid = $uuid;
return $aggregateRoot->reconstituteFromEvents();
}
public function createAccount(string $name, string $userId)
{
// Добавляем событие в массив событий на отправку
// у событий, отправляемых в recordThat, есть хуки, но о них позже,
// т.к. на создание счета их нет)
$this->recordThat(new AccountCreated($name, $userId));
return $this;
}
Метод persist
вызывает метод storeMany
у модели указанной в конфигурации event-projector.php как stored_event_model
в нашем случае StoredEvent
public static function storeMany(array $events, string $uuid = null): void
{
collect($events)
->map(function (ShouldBeStored $domainEvent) use ($uuid) {
$storedEvent = static::createForEvent($domainEvent, $uuid);
return [$domainEvent, $storedEvent];
})
->eachSpread(function (ShouldBeStored $event, StoredEvent $storedEvent) {
// Вызываем все проекторы, которые не реализуют интерфейс
// QueuedProjector*
Projectionist::handleWithSyncProjectors($storedEvent);
if (method_exists($event, 'tags')) {
$tags = $event->tags();
}
// Отправляем в очередь джобу обработки и записи события
$storedEventJob = call_user_func(
[config('event-projector.stored_event_job'), 'createForEvent'],
$storedEvent,
$tags ?? []
);
dispatch($storedEventJob->onQueue(config('event-projector.queue')));
});
}
Проекторы AccountProjector и TransactionCountProjector реализуют Projector
поэтому реагировать на события будут синхронно вместе с их записью.
Ок, счет создали. Предлагаю рассмотреть как же клиент будет его читать.
Отображение счета
// Идем в таблицу `accounts` и берем счет по id
public function index()
{
$accounts = Account::where('user_id', Auth::user()->id)->get();
return view('accounts.index', compact('accounts'));
}
Если проектор счетов реализует интерфейс QueuedProjector, то пользователь ничего не увидит пока событие не будет обработано по очереди.
Напоследок изучим, как работает пополнение и снятие денег со счета.
Пополнение и снятие
Снова смотрим в контроллер AccountsController:
// Получаем события с uuid агрегата
// в зависимости от запроса вызываем пополнение
// или снятие денег, затем отправляем на запись
public function update(Account $account, UpdateAccountRequest $request)
{
$aggregateRoot = AccountAggregateRoot::retrieve($account->uuid);
$request->adding()
? $aggregateRoot->addMoney($request->amount)
: $aggregateRoot->subtractMoney($request->amount);
$aggregateRoot->persist();
return back();
}
Рассмотрим AccountAggregateRoot
при пополнении счета:
public function addMoney(int $amount)
{
$this->recordThat(new MoneyAdded($amount));
return $this;
}
// Помните говорил о "хуке" в recordThat
// AggregateRoot*?
// В нем вызывается метод apply(ShouldBeStored $event),
// который в свою очередь вызывает метод 'apply' . EventClassName агрегата
// Хук, который срабатывает при обработке `MoneyAdded`
protected function applyMoneyAdded(MoneyAdded $event)
{
$this->accountLimitHitInARow = 0;
$this->balance += $event->amount;
}
при снятии средств:
public function subtractMoney(int $amount)
{
if (!$this->hasSufficientFundsToSubtractAmount($amount)) {
// Пишем событие о попытке снять больше лимита
$this->recordThat(new AccountLimitHit());
// Если слишком много попыток шлем событие, что
// нужно больше золота, на которое реагирует реактор
// и отправляет сообщение пользователю
if ($this->needsMoreMoney()) {
$this->recordThat(new MoreMoneyNeeded());
}
$this->persist();
throw CouldNotSubtractMoney::notEnoughFunds($amount);
}
$this->recordThat(new MoneySubtracted($amount));
}
protected function applyMoneySubtracted(MoneySubtracted $event)
{
$this->balance -= $event->amount;
$this->accountLimitHitInARow = 0;
}
Заключение
Постарался максимально без воды описать процесс "онбординга" в CQRS/ES на Laravel. Концепция очень интересная, но не без особенностей. Прежде, чем внедрять помните о:
- eventual consistency;
- желательно использовать в отдельных доменах DDD, не стоит делать большую систему полностью на этом паттерне;
- изменения в схеме таблицы событий могут быть очень болезненны;
- ответственно стоит подойти к выбору гранулярности событий, чем больше будет конкретных событий, тем больше их будет в таблице и большее количество ресурсов будет необходимо на работу с ними.
Буду рад замеченным ошибкам.
Комментарии (8)
Jedi_PHP
17.04.2019 09:49> eventual consistency;
> банк
Вы уверены что подобрали нужный инструмент?xenmayer Автор
17.04.2019 10:06+1Пример из статьи носит исключительно ознакомительный характер, чтобы описать быструю пробу концепции в экосистеме Laravel. Данная реализация, по моему мнению, достаточна наглядна. Что касается CQRS/ES для операций с деньгами, то она вполне оправдана в случаях, когда функциональные требования не будут нарушены особенностями реализации. С eventual consistency можно жить, если клиентам не придется мучительно долго ждать обновления проекций. Надеюсь правильно понял Ваш вопрос.
Интересная статья на эту тему от команды, которая на Elixir разрабатывает банкинг
varanio
17.04.2019 10:11+1Отставание будет только в проекциях, что вполне допустимо.
Агрегаты же обеспечивают достаточную consistency, чтобы нельзя было снять со счета, если там не хватает денег — там всё четко
guyfawkes
17.04.2019 23:18А что вы бы выбрали?
xenmayer Автор
18.04.2019 09:48К сожалению, однозначно сказать не могу, т.к. выбор подхода очень ответственная задача, без досконального выяснения всех требований её решать опасно. Если бы в функциональных требованиях была историчность, то CQRS/ES, многопоточность и реализация блокировок на уровне приложения.
guyfawkes
А как решается проблема с состоянием гонки, конкурентными запросами на снятие/пополнение и прочим?
xenmayer Автор
В случае данной реализации, когда мы на HTTP запросы синхронно выполняем эти операции, — никак. И когда время расчета текущего состояния по событиям в агрегате будет ощутимым — могут начаться проблемы. Очередь для команд в агрегаты — решит проблему. Не пойму почему создатели примера использования этой библиотеки не учли это.