На данный момент на рынке мало Event sourcing + CQRS фреймворков. А те, что есть, непопулярны и имеют слабую поддержку, поэтому многие создают свои in-house решения. В этой статье расскажу о выполненном командой «Programming Store» проекте Prostore, который может послужить примером при создании вашего собственного решения.
Event Sourcing
Event Sourcing — это архитектурный шаблон, подход для хранения данных в виде событий. В традиционном подходе мы храним конечное состояние. Event Sourcing — это кардинально иной подход. Вместо хранения конечного состояния, мы храним все промежуточные состояния в виде событий. Конечное состояние получаем последовательным применением всех промежуточных состояний. Хранилище данных реализуется как неизменяемый журнал только для добавления, и как правило, именуется отдельно как Event Store.
CQRS
CQRS расшифровывается как Command and Query Responsibility Segregation. Это шаблон, который разделяет операции чтения (Query) и обновления (Command) для хранилища данных. Разделяя операции на Command и Query мы можем максимизировать производительность, масштабируемость и безопасность наших сервисов. Также это увеличивает гибкость всей системы. Для Command и Query сервисов мы можем выбирать разные фреймворки, языки программирования, базы данных. Например, мы могли бы хранить/обновлять данные в реляционной БД, а делать запросы в графовой БД.
Event Sourcing + CQRS
Как было описано выше, Event Sourcing в качестве хранилища данных использует структуру типа «журнал». Мы можем только добавлять события в конец, а изменять и удалять события не можем. Тут важно подчеркнуть, мы можем хранить в Event Store только события, другие структуры шаблоном не предусмотрены. События Event Sourcing — это события предметной области, которые уже произошли в какой-то момент в прошлом. Например:
class OrderItemAddedEvent {
String orderId;
String userId;
String itemId;
long timestamp;
}
Таких событий в сложной предметной области может быть много. Это накладывает ограничения на структуру хранения и производительность. Для решения данной проблемы Event Sourcing применяется вместе с CQRS. Конечное решение может выглядеть так:
Тут:
Client — клиент наших сервисов (пользовательский интерфейс, внешний/внутренний сервис).
Commands — команды в определении CQRS, операции, результат которых приводит к созданию событий в Event Store.
Command Service A — сервис или группа сервисов, которые обслуживают команды определенных типов. Например, сервис заказов.
Command Service B — другой сервис команд.
Event Store — хранилище событий. Например, MongoDB, PostgreSQL, Apache Cassandra, Redis. Главная характеристика при выборе Event Store — это производительность добавления записи.
Events — события.
Event Bus — шина событий. Например, Apache Kafka, Redis, RabbitMQ.
Query — команда - запрос на получение агрегированных данных.
Query Service A — сервис или группа сервисов, которые обслуживают запросы определенных типов. Например, история заказов.
Query Service B — другой сервис запросов.
Query Storage — хранилище агрегированных данных, оптимальных для запросов. Тут для двух сервисов использован один Query Storage, но вы можете для каждого сервиса запросов использовать свой storage. Например, для одного Mongodb базу, для другого PostgreSQL.
Это только один из вариантов реализации, и на мой взгляд, наиболее близкий к «каноническому». Для простоты схемы я не стал добавлять сюда Service Discovery, хотя он на практике обязателен (клиент должен знать какие command/query сервисы ему доступны).
Преимущества и применения
Простая структура типа «журнал» дает нам некоторые преимущества:
Низкие требования к транзакционной логике. Операции, проводимые с базой, атомарные и неизменяемые (добавление в конец).
Высокая вертикальная производительность.
Простое масштабирование (шардирование).
Большой выбор баз данных (key-value, NoSQL, SQL).
Возможность восстановить состояние из любой точки в истории.
Быстрые изменения логики, связанной с базой (нет схемы и не нужно делать миграции).
Так как мы используем события, мы также получаем преимущества событийно-ориентированной архитектуры:
Слабая связность.
Простота в проектировании доменной области.
Возможные применения:
Аудит, журналирование.
Логистика.
Денежные и иные транзакции с высоким требованием к согласованности данных.
Способ хранения для «state machine».
Системы, проектируемые с ориентиром на слабую связность.
Для данных случаев подход Event Sourcing отлично показывает себя на практике (Netflix, Walmart), в особенности, когда мы имеем дело с большими данными.
Недостатки
Так как нам нужно хранить все события доменной области, Event Sourcing требует большего объема дискового пространства в сравнении с традиционными подходами.
Как было сказано выше, другие недостатки Event Sourcing решаются применением CQRS. CQRS архитектурно решает проблему масштабируемости и производительности сложных запросов. К сожалению, требования к объему хранимых данных нельзя решить только лишь архитектурным подходом. Это сужает применимость данного шаблона. При выборе Event Sourcing для вашего проекта, следует изучить этот недостаток, сделать расчеты. Так же я бы не советовал применять данный подход как прямую замену традиционного подхода, поскольку он им не является. Я видел проект, где данный подход был применен для всего проекта, для CRUD логики, и это было ужасно. Так как Event Sourcing требует хранения всех событий доменной области, были события вроде UserEmailChanged. И так практически для каждого поля.
К самым главным недостаткам я бы отнес нишевость, и как следствие непопулярность. На рынке не так много готовых решений. Платные решения могут стоить дорого, а решения с открытым исходным кодом имеют небольшое сообщество. И как следствие популярны in-house решения. Вам придется сделать выбор: платить дорого, выбрав платное решение, или заплатить временем, выбрав решение с открытым исходным, либо все же сделать свое решение. Конечно, не зная вашего случая, мне трудно давать какие-то практические советы. Далее я хотел бы показать пример того, как можно сделать собственное решение.
Решение
Ссылка на код решения https://github.com/ProgrammingStore/prostore. Проект реализован на Spring Boot 2.7.5. Архитектурно решение выглядит как на ранее предложенной схеме.
Требования:
Java >= 17
Apache maven >= 3.8
Mongodb >= 6.0.1
Kafka >= 2.13-3.3.1
Состав проекта:
prostore-core — ядро, основные абстракции;
prostore-eureka — подключаемый модуль, имплементация CommandBus/QueryBus на основе Spring Cloud Eureka;
prostore-mongo — подключаемый модуль, имплементация EventStore на Mongodb;
prostore-kafka — подключаемый модуль, имплементация EventBus на основе Apache Kafka;
prostore-test-common — общая библиотека для тестовых проектов;
prostore-test-service — тестовый проект «сервис». Это комбинированный сервис для команд и запросов;
prostore-test-client — тестовый проект «клиент»;
spring-boot-prostore-starter — основной стартер;
spring-boot-prostore-eureka-starter — стартер для eureka;
spring-boot-prostore-mongo-starter — стартер для mongo;
spring-boot-prostore-kafka-starter — стартер для kafka;
prostore-eureka-server — eureka server.
При создании собственного решения Event sourcing важно обратить внимание на гибкость. Не стоит внедрять жесткие зависимости. Возможно, для каких-то случаев вам понадобится заменить базу, например на SQL.
В данном проекте был выбран Mongodb, как наиболее оптимальный для хранения event-ов. Другие возможные варианты:
Apache Cassandra;
Redis;
PostgreSQL/MySQL.
Главная характеристика при выборе Event Store — это производительность добавления записи, а также простота шардинга.
Apache Kafka был выбран в качестве Event Bus. Требования к Event Bus зависят от выбранной архитектуры и реализации. Также в некоторых реализациях Event Store может служить как Event Bus. В текущей реализации задача Event Bus — это гарантированная доставка событий.
Spring Cloud Eureka используется как имплементация CommandBus/QueryBus.
Определения основных абстракций:
Aggregate — агрегат состояний. Это то самое конечное состояние, которое получаем, применяя последовательно все состояния. Имеет уникальный id. Например:
@Data
class Order implements Aggregate {
String id;
String userId;
List<OrderItem> orderItems;
}
AggregateEvent — событие, произошедшее в контексте агрегата. Например, событие OrderItemAddedEvent происходит в контексте агрегата Order.
Cache — для кэширования агрегатов. Постоянно строить агрегаты, применяя все события — дорого, поэтому мы используем кэш.
Command — команда, маркерный интерфейс. Пример:
public class CreateOrderCommand implements Command {
private String userId;
private String storeId;
}
CommandBus — шина для отправки команд.
CommandHandler — обработчик команд.
Event — событие, маркерный интерфейс.
EventBus — шина для отправки событий.
EventHandler — обработчик событий.
EventStore — хранилище событий.
Query — запрос, маркерный интерфейс. Пример:
public class GetOrderByIdQuery implements Query {
private String aggregateId;
}
QueryBus — шина для отправки запросов.
QueryHandler — обработчик запросов.
Важно понимать разницу между CommandHandler и EventHandler. CommandHandler — это обработчик команд. CommandBus отправляет команду только одному инстансу. При неудачной попытке отправки клиент должен повторно отправить команду. В обработчике, получив команду, мы должны сгенерировать соответствующее команде событие или события, сохранить их в EventStore и отправить в EventBus. Пример:
@Component
class CreateShipmentCommandHandler implements CommandHandler<CreateShipmentCommand> {
private final Logger logger = LoggerFactory.getLogger(CreateShipmentCommandHandler.class);
private final EventStore eventStore;
private final EventBus eventBus;
private CreateShipmentCommandHandler(EventStore eventStore, EventBus eventBus) {
this.eventStore = eventStore;
this.eventBus = eventBus;
}
@Override
public String handle(CreateShipmentCommand command) {
logger.debug("command = {}", command);
ShipmentCreatedEvent event = new ShipmentCreatedEvent(
UUID.randomUUID().toString(), command.getDestination(), command.getLocation()
);
eventStore.save(event);
eventBus.publish(event);
return String.format("created: %s", event.getAggregateId());
}
}
EventHandler — это обработчик событий. EventBus отправляет события всем инстансам. Переотправку событий после неудачной попытки должен взять на себя EventBus. В обработчике EventHandler мы можем обновить состояние агрегата (если это AggregateEvent), отправить новую команду. Пример:
@Component
class ShipmentMovedEventHandler implements EventHandler<ShipmentMovedEvent> {
private final Logger logger = LoggerFactory.getLogger(ShipmentMovedEventHandler.class);
private final EventStore eventStore;
private ShipmentMovedEventHandler(EventStore eventStore) {
this.eventStore = eventStore;
}
@Override
public void handle(ShipmentMovedEvent event) {
logger.debug("Got event: {}", event);
Shipment shipment = eventStore.get(event.getAggregateId());
shipment.setLocation(event.getLocation());
}
}
Запуск тестового проекта:
Запустите локально mongodb;
Запустите локально kafka;
mvn clean install;
mvn spring-boot:run -f prostore-eureka-server;
PORT=9000 mvn spring-boot:run -f prostore-test-client;
PORT=9001 mvn spring-boot:run -f prostore-test-service;
PORT=9002 mvn spring-boot:run -f prostore-test-service.
Тут мы запустили сервер Eureka, тестовый клиент на порту 9000, два инстанса тестовых сервисов на портах 9001 и 9002.
Теперь мы можем приступить к тестированию. Тестируем через отправку запросов на тестовый сервис:
Создание shipment (в ответ получите aggregateId):
curl -v -H "Content-Type: application/json" \
-d '{"destination":"Moscow", "location": "Almaty"}' \
http://localhost:9000/shipment
Получение shipment (вместо AGGREGATE_ID вставьте aggregateId полученный командой создания):
curl -v -X GET -H "Content-Type: application/json" \
-d '{"aggregateId": "AGGREGATE_ID"}' \
http://localhost:9000/shipment
Изменение shipment (вместо AGGREGATE_ID вставьте aggregateId полученный командой создания):
curl -v -H "Content-Type: application/json" \
-d '{"aggregateId":"AGGREGATE_ID", "location": "Sydney"}' \
http://localhost:9000/shipment/move
Если запущены несколько инстансов `prostore-test-service`, запросы будут балансироваться по round-robin.
Попробуйте перезапустить сервисы. При старте сервиса будет запущен replay event-ов для восстановления агрегатов.
Все события будут добавляться в коллекцию MongoEvent (база test по умолчанию):
{
"_id" : ObjectId("63a3f2d70ac51a617297cda5"),
"aggregateId" : "7a97ca5a-3375-4be1-bbe2-83ae7b40614d",
"eventType" : "ru.programstore.prostore.test.common.event.ShipmentCreatedEvent",
"eventJson" : "{\"aggregateId\":\"7a97ca5a-3375-4be1-bbe2-83ae7b40614d\",\"destination\":\"Moscow\",\"location\":\"Almaty\"}",
"timestamp" : NumberLong(2075100266365871),
"_class" : "ru.programstore.prostore.mongo.MongoEvent"
}
MongoEvent представлен модулем mongodb и не требует каких-то действий со стороны разработчика.
Тут если сравнивать с традиционным подходом, нет отдельных коллекций/таблиц для сущностей доменной области. Структурно это можно представить как поток данных, которые делятся по aggregateId. При традиционном подходе мы бы сохранили сущность Shipment в таблице/коллекции и затем меняли бы состояние Shipment по id. В Event Sourcing мы сохраняем событие ShipmentCreatedEvent с каким-то начальным состоянием, задаем/получаем aggregateId. И далее можем генерировать любые события по этому aggregateId. Например, ShipmentMovedEvent. При добавлении/изменении полей нам не нужно менять схему. Мы меняем поля событий, либо добавляем новые типы событий.
Комментарии (16)
gybson_63
00.00.0000 00:00Решений действительно мало, всё сожрала 1С.
bae_prosto Автор
00.00.0000 00:00Можете привести аналогию Event sourcing + CQRS в 1С, которая "сожрала" другие решения?
gybson_63
00.00.0000 00:00+2Так она сама по себе архитектурно так устроена, что все действия оформляются документами, которые при "проведении" меняют состояние регистров (остатки запасов, состояния и прочее). Сама платформа так устроена. Журнал документов это "event store", регистры это "query storage". Документы легко переносятся из системы управленческого учета, в систему бух. учета и при проведении там, формируют другую базу для запросов. Собственно она и используется и для логистики и для финансов и вообще для любого бизнеса.
В 7.7 версии был единый журнал для всех событий, что оказалось плохой идеей, так как транзакция записи одного документа, блокировала запись любых остальных.
mishamota
00.00.0000 00:00+2Первая половина статьи очень общая, во второй внезапный переход к технической реализации и примерам. Хотелось бы узнать про функциональность и ограничения описанного фреймворка:
1) Устойчива ли система к дублям входных событий или от клиента требуется гарантия, что он события exactly-once'ит?
2) Есть ли возможность запросить историческое состояние агрегата на фиксированный момент времени? Гарантируется ли в этом случае неизменность агрегата на фиксированный момент времени?
3) Требуется ли от клиента соблюдать упорядоченный поток событий? Если событие с таймстемпом 2 придет раньше события с таймстемпом 1, есть ли гарантии, что агрегат изолирует (не покажет при запросе) изменение 2 до применения изменения 1?
bae_prosto Автор
00.00.0000 00:00+2Согласны с замечаниями. Вопросы очень актуальные. Изначально хотели раскрыть эти темы, но всему свое время ????
Если статья получит хороший отклик, будем дорабатывать фреймворк и раскрывать тему подробнее.
AlexViolin
00.00.0000 00:00+1На структурной схеме правая часть, которая описывает Query часть системы, явно нуждается в более подробной детализации:
Event bus -> Events -> Event handling service -> Query Storage
Client -> Query -> Query service -> Query Storagebae_prosto Автор
00.00.0000 00:00Спасибо за конструктивное замечание!
AlexViolin
00.00.0000 00:00Предполагаю, что в коде так всё и сделано. Надо только поправить на рисунке в этой статье.
vag23
00.00.0000 00:00+1Интересный подход с кэшом, правильно ли я понял, что в нем висит конечное состояние агрегата с учетом полученных ранее событий? Если да, то означает ли это вычитку всех событий при рестарте приложения или на попадание агрегата в кэш есть триггеры?
bae_prosto Автор
00.00.0000 00:00Да, поняли правильно. Кэш не персистентный. При рестарте данные кэша теряются, события заново генерируются и кэш собирается с нуля.
Это затратно, но для каких-то кейсов это может быть вполне рабочее решение.
Для оптимизации можно:
1) Реализовать персистентный кэш
2) Периодически делать снепшоты агрегатов в кэше (хранить в базе с timestamp). При рестарте загрузить агрегат из снепшота и сгенерировать события заново с момента > timestamp
1-вариант несет доп. нагрузку на запись, 2-вариант оптимальнее (учитывая настройку периодичности)
vag23
00.00.0000 00:00+1Аxon framework не рассматривали в качестве прототипа? (я не от них)), просто решаем на проекте похожие задачи)
bae_prosto Автор
00.00.0000 00:00Да, Axon framework рассматривали. Его платный вариант для нашего кейса выходил очень дорого (они считают по кол-ву event-ов). Кроме цены, в целом он неплохой. У Axon framework еще есть open-source вариант, но его я бы не стал рассматривать.
vag23
00.00.0000 00:00Не стали бы рассматривать по причинам, указанным в статье (комьюнити, время на погружение), или обнаружили более конкрентые недостатки?
Kisva
Я бы добавил к преимуществам возможность аналитики по любым событиям и по времени практически "из коробки". Это очень важная возможность Event sourcing + CQRS архитектуры.
bae_prosto Автор
Соглашусь с вами