Пришел как-то на новый проект, а там Event sourcing+CQRS. Посмотрел как эта архитектура устроена тут и тут и очень вдохновился. На проекте были джава и спрингом, но архитектура настраивалась полностью вручную.

Как мы знаем, аннотации это хорошо. Давайте попробуем собрать handler’ы и applier’ы с помощью кастомных аннотаций.

Задача - модуль подсчета денег за пиццу. Кто-то может купить пиццу, кто-то может съесть кусочек пиццы. Купивший сможет посмотреть сколько и кто ему должен, а должники могут отметить, что вернули ему деньги.

Структура проекта

Проект реализуем в двух модулях. Модуль event-sourcing будет содержать необходимую инфраструктуру для сбора handler’ов и applier’ов. А в модуле api используем эту инфраструктуру. Ну и проверять будем в тесте. Сразу вот ссылка на проект на гите

Wright модель

В рамках архитектуры все данные на запись получаются с помощью событий. За счет событий и команд происходит обработка данных по бизнес-процессам. Команда отличается от события. Команда инициирует какую-либо логику, а событие - это команда, которая записывается и впоследствии по списку событий можно восстановить состояние объекта.

Создадим интерфейс команды в модуле event-sourcing. От него о будут имплементироваться все обрабатываемые команды. А класс события будет располагаться в модуле api. Это нужно поскольку логика сохранения события может быть различной. Класс Event абстрактный и содержит идентификатор связанных событий и id события.

@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
@Data
public abstract class Event implements Command {
    private UUID id;
    private Long eventId;
    @Builder.Default
    private Instant createAt = Instant.now();
}

Для начала мы должны получить данные о том, какую пиццу купил человек. Для этого будем использовать ивент BuyPizza с информацией о том кто купил, сколько пицца стоила и сколько кусков в ней.

@Data
@SuperBuilder
public class BuyPizza extends Event {
    private Integer countOfPieces;
    private Integer price;
    private String username;
}

Так же напишем обработчик этого события. Пусть функция обработчик принимает команду для обработки, и возвращает лист порожденных команд. Проверим данные на корректность, и уведомим всех о том, что пицца куплена. Для этого вернем из функции событие SendPizzaNotification и для него тоже напишем обработчик.

@Handler
public List<Command> handle(BuyPizza command){
        if (command.getUsername()!=null && command.getUsername().isBlank())
            throw new RuntimeException("имя должно быть заполнено");
        if (command.getCountOfPieces() == null || command.getCountOfPieces() <= 0)
            throw new RuntimeException("кусочков не может не быть");
        if (command.getPrice() == null || command.getPrice() <= 0)
            throw new RuntimeException("пицца обязательно чего-то стоит");
        log.info("Мистер {} купил {} кусков пиццы за {} денег",
                command.getUsername(),
                command.getCountOfPieces(),
                command.getPrice());
        return of(SendPizzaNotification.builder()
                .username(command.getUsername())
                .build());
}

@Handler
public List<Command> handle(SendPizzaNotification command) {
 log.info("Пицца пицца у {} пицца!!!", command.getUsername());
 return Collections.emptyList();
}

Теперь стоит задача, как собрать все handler’ы и при публикации команды вызывать все предназначенные ей обработчики. Для этого потребуется соответствующий сервис. В нем нам потребуются обработчики для успешно обработанных команд и для неуспешно. Также требуется способ добавления новых обработчиков. В функцию addCommandHandler будем передавать класс команды которую будут обрабатывать и обработчик принимающий в себя команду.

public interface Publisher {
    void publish(Command message);
    void publish(List<Command> proceed);
    void fail(Command message);
    void fail(List<Command> proceed);
    void addCommandHandler(Class type, Consumer<Command> consumer);
}

Реализация этого интерфейса будет основана на мапе: класс команды против списка обработчиков. При публикации команды будем получать список всех ее обработчиков и по очереди применим их к команде.

@Component
@RequiredArgsConstructor
public class PublisherImpl implements Publisher {

    private final Map<Class<? extends Command>, Collection<Consumer<Command>>> consumerHandlerMap = new HashMap<>();

    private final CommandProcessor commandProcessor;

    @Override
    public void publish(Command command) {
        if (command == null) return;
        Collection<Consumer<Command>> handlers = consumerHandlerMap.get(command.getClass());
        if (handlers != null && !handlers.isEmpty()){
            Command processedCommand = commandProcessor.processCommand(command);
            handlers.forEach(x -> x.accept(processedCommand));
        } else {
            throw new IllegalStateException(format("No handlers exist for class %s", command.getClass()));
        }
    }
...
    @Override
    public void addCommandHandler(Class type, Consumer<Command> consumer) {
        if (consumerHandlerMap.containsKey(type)) {
            consumerHandlerMap.get(type).add(consumer);
        } else {
            List<Consumer<Command>> handlerList = new LinkedList();
            handlerList.add(consumer);
            consumerHandlerMap.put(type, handlerList);
        }
    }
}

После этого пропишем обработчик команд, как минимум их надо сохранить. Это мы сделаем в листе с помощью EventRepository, потому что лень морочиться с базой данных.

@Component
@RequiredArgsConstructor
@Slf4j
public class CommandProcessorImpl implements CommandProcessor {

    private final EventRepository eventRepository;

    @Override
    public Command processCommand(Command command) {
        if (command instanceof Event) {
            eventRepository.save((Event)command);
        }
        return command;
    }

    @Override
    public Command processFailure(Command command) {
        log.info("обрабатываем ошибку");
        return command;
    }
}

В целом на этом сама архитектура пишущей модели готова, и остается только настроить паблишер и использовать его. Как вариант настраивать его через конструктор, но тогда придется для каждого нового хэндлера добавлять его в паблишер в конструкторе, появляется колбаса кода, да и зачем, если всем этим может заниматься спринг.
Как видите все хэндлеры помечены аннотацией @Handler. Напишем бин пост процессор, соберем все хэндлеры из всех компонентов и настроим ими publisher. Компонент сначала попадает в обработчик до создания прокси.

В функцию postProcessBeforeInitialization передается экземпляр оригинального класса. Соберем все методы отмеченные аннотацией @Handler и запишем их в мапу против имени бина. Теперь перейдем к функции postProcessAfterInitialization. В нее передаются прокси всех компонентов. На этом этапе будем искать функции-хэндлеры в каждом компоненте и добавлять их в Publisher.

Важный вопрос, почему надо добавлять именно таким образом, ведь можно как только мы нашли метод в оригинальном классе сразу же его добавлять в Publisher. Это нужно потому что в прокси объекте добавляется различная функциональность и если мы добавим оригинальный объект а не прокси можно потерять, например, создание транзакции по аннотации @Transactional

@Component
@RequiredArgsConstructor
public class CommandHandlerBeanPostProcessor implements BeanPostProcessor {

    private final Map<String, List<Method>> handlers = new HashMap<>();
    private final Publisher publisher;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        List<Method> handleMethods = Stream.of(bean.getClass().getMethods())
                .filter(x -> x.isAnnotationPresent(Handler.class))
                .collect(Collectors.toList());
        if (!handleMethods.isEmpty()){
            handlers.put(beanName, handleMethods);
        }
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (handlers.containsKey(beanName)) {
            handlers.get(beanName).forEach(x -> addWithInvoke(bean, x));
        }
        return bean;
    }

    private void addWithInvoke(@NonNull Object object, @NonNull Method handleMethod) {
        Class[] classes = handleMethod.getParameterTypes();
        if (classes.length != 1) throw new RuntimeException("handler mast contains in params only Command");
        publisher.addCommandHandler(classes[0], a -> invoke(object, handleMethod, a));
    }

    @SneakyThrows
    private void invoke(@NonNull Object object, @NonNull Method handleMethod, Object param)
    {
        handleMethod.invoke(object, param);
    }

}

На этом этапе уже можно использовать Publisher. Инжектим его где необходимо и с помощью него публикуем команды. В таком формате незачем возвращать ничего из хэндлера. Публиковать команды чаще всего требуется публиковать в конце выполнения логики, так как если возникнет ошибка, то ни одна команда не должна была быть опубликована. Пусть все порожденные команды будут возвращаться из хэндлера и автоматически публиковаться.

Для этого надо создать аспект, который будет декорировать все хендлеры. Если при вызове хэндлера возникает ошибка, вызываем обработку ошибки у Publisher’а.

@Aspect
@Component
public class HandlerContextAspect {

    @Autowired
    private Publisher publisher;

    @Pointcut(value = "@annotation(ru.abdyabdya.es_cqrs.annotations.Handler)")
    public void callAtHandlerAnnotation() { }

    @Around(value = "callAtHandlerAnnotation()")
    @SneakyThrows
    public Object aroundCallAt(ProceedingJoinPoint pjp) {
        List<Command> retVal = null;
        try {
            retVal = (List<Command>)pjp.proceed();
            publisher.publish(retVal);
        } catch (CommandException commandException) {
            publisher.fail(commandException.getCommand());
        }
        return retVal;
    }
}

На с пишущей моделью мы заканчиваем. Напишем еще пару событий и хендлеров для возврата денег и съедания кусочка пиццы и перейдем к следующему этапу.

@Data
public class GiveMoneyBack extends Event {
    private Integer amountOfMoney;
    private String username;
    private String targetUser;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TakePizzaPieces extends Event {
   private String name;
}
@Handler
public List<Command> handle(TakePizzaPiece command) {
     if (pizzaService.checkPieces(command.getEventId())) {
        return of(PieceTaken.builder()
.eventId(command.getEventId())
.username(command.getUsername())
.build());
    } else {
        log.info("а кусочек взять нельзя");
        return null;
    }
}

@Handler
public List<Command> handle(GiveMoneyBack command) {
    log.info("{} вернул {}", command.getUsername(), command.getAmountOfMoney());
    return Collections.emptyList();
}

Read модель

Для начала определим, какие данные нам нужны. Как вы видели, до этого мы использовали функцию canTakeOnePiece. Очевидно, она проверяет остались ли кусочки пиццы, но для такой проверки требуется сначала получить количество оставшихся кусочков. Создадим для этого функции, которые будут принимать предыдущее состояние и ивенты которые на него могут повлиять и возвращать новое состояние. Пусть на покупку пиццы количество ее кусочков добавляется к тем, что были, а при поедании кусочка их количество уменьшается на 1. Реализуем это в классе PizzaApplier.

@Applier
public class PizzaApplier extends ApplierContainer<Integer> {
   @Applier
   public Integer apply(Integer countOfPieces, BuyPizza command){
       if (countOfPieces==null) countOfPieces = 0l;
       return countOfPieces + command.getCountOfPieces();
   }
   @Applier
   public Integer apply(Integer countOfPieces, PieceTaken command){
       return countOfPieces - 1;
   }
}

Теперь, чтобы не писать простыню из ifelse надо собрать все апплаеры в мапу каждый против команды, которую он обрабатывает. Пусть для этого будет абстрактный класс ApplierContainer. В него же добавим и логику сборки объекта по командам. Дженериком здесь будет класс, который собирается из событий. В нашем случае это будет Integer. Еще нам потребуется функциональный интерфейс для апплаеров.

public abstract class ApplierContainer<T> {

    private Map <Class, ApplierFunction<T>> classConsumerMap = new HashMap <>();

    public T apply(T object, List <? extends Command> commands){
        for (Command command: commands) {
            if (classConsumerMap.containsKey(command.getClass())) {
                object = classConsumerMap.get(command.getClass()).apply(object, command);
            }
        }
        return object;
    }

    public T apply(List <? extends Command> commands){
        return apply(null, commands);
    }

    public void add(Class cls, ApplierFunction<T> applierFunction){
        classConsumerMap.put(cls, applierFunction);
    }

}

public interface ApplierFunction<T> {
   public T apply(T object, Command event);
}

Теперь необходимо как-то настроить каждый класс. Напишем бин пост процессор. Здесь мы также соберем все подходящие методы, а после добавим эти же методы но уже из прокси объекта. И теперь будем настраивать каждый подходящий бин по-отдельности.

@Component
public class CommandApplierBeanPostProcessor implements BeanPostProcessor {
    private final Map<String, List<Method>> appliersNames = new HashMap<>();

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof ApplierContainer) {
            List<Method> applierMethods = Stream.of(bean.getClass().getMethods())
                    .filter(x -> x.isAnnotationPresent(Applier.class))
                    .collect(Collectors.toList());
            appliersNames.put(beanName, applierMethods);
        }
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (appliersNames.containsKey(beanName)) {
            if (!appliersNames.get(beanName).isEmpty()) {
                for (Method method : appliersNames.get(beanName)) {
                    ((ApplierContainer<?>) bean).add(
                            method.getParameterTypes()[1],
                            getApplierFunction(bean, method));
                }
            }
        }
        return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
    }

    private ApplierFunction getApplierFunction(@NonNull Object object, @NonNull Method applyMethod) {
        return (obj, command) -> invokeApplierFunction(object, applyMethod, obj, command);
    }

    @SneakyThrows
    private Object invokeApplierFunction(
            Object bean,
            Method applyMethod,
            Object processingObject,
            Command command) {
        return applyMethod.invoke(bean, processingObject, command);
    }

}

На этом настройка апплаеров закончена. Реализуем проверку количества кусочков и создадим класс PizzaDto, в который запишем имя купившего пиццу, количество оставшихся кусков и стоимость изначальной пиццы и мапу должников с долгами. Ну и напишем апплаеры для него.

@Service
@RequiredArgsConstructor
public class PizzaService {
    private final PizzaPieceApplier pizzaPieceApplier;
    private final PizzaApplier pizzaApplier;
    private final EventRepository eventRepository;

    public boolean checkPieces(Long id){
        return pizzaPieceApplier.apply(eventRepository.findById(id)) > 0;
    }

    public PizzaDto getById(Long id){
        return pizzaApplier.apply(id);
    }
}
@Applier
public class PizzaPieceApplier extends ApplierContainer<Integer> {

    @Applier
    public Integer apply(Integer countOfPieces, BuyPizza command){
        if (countOfPieces==null) countOfPieces = 0;
        return countOfPieces + command.getCountOfPieces();
    }

    @Applier
    public Integer apply(Integer countOfPieces, PieceTaken command){
        return countOfPieces - 1;
    }
}



@Applier
@RequiredArgsConstructor
public class PizzaApplier extends SnapshotApplierContainer<PizzaDto, Long> {

    private final PizzaRepository pizzaRepository;

    @Applier
    public PizzaDto apply(PizzaDto pizzaDto, BuyPizza command){
        if (pizzaDto==null) pizzaDto = new PizzaDto();
        if (pizzaDto.getUsername()!=null && !pizzaDto.getUsername().equals(command.getUsername())){
            throw new RuntimeException("разные люди не могут покупать пиццу с одинаковыми айди");
        }
        pizzaDto.setId(command.getEventId());
        pizzaDto.setLastDate(command.getCreateAt());
        pizzaDto.setUsername(command.getUsername());
        pizzaDto.setPiecePrice(command.getPrice()/command.getCountOfPieces());
        pizzaDto.setPrice(pizzaDto.getPrice()+command.getPrice());
        pizzaDto.setPieceCount(pizzaDto.getPieceCount()+command.getCountOfPieces());

        return pizzaDto;
    }

    @Applier
    public PizzaDto apply(PizzaDto pizzaDto, PieceTaken command){
        if (pizzaDto.getBorrowers().containsKey(command.getUsername())){
            pizzaDto.getBorrowers().put(
                    command.getUsername(),
                    pizzaDto.getBorrowers().get(command.getUsername())+pizzaDto.getPiecePrice());
        } else {
            pizzaDto.getBorrowers().put(command.getUsername(), pizzaDto.getPiecePrice());
        }
        pizzaDto.setLastDate(command.getCreateAt());
        return pizzaDto;
    }

    @Override
    protected PizzaDto getApplyingObjectById(Long id) {
        return pizzaRepository.findById(id);
    }

    @Override
    protected void saveApplyingObject(PizzaDto applyingObject) {
        pizzaRepository.save(applyingObject);
    }
}

Ну и напишем тест чтобы проверить что что-то работает

@SpringBootTest
class EsCqrsApplicationTests {
   @Autowired
   private Publisher publisher;
   @Autowired
   private PizzaService pizzaService;
   @Test
   void contextLoads() {
               publisher.publish(BuyPizza.builder()
                .eventId(1l)
                .price(100)
                .countOfPieces(2)
                .username("john").build());
        publisher.publish(TakePizzaPiece.builder().eventId(1l).username("jack").build());
        publisher.publish(TakePizzaPiece.builder().eventId(1l).username("jim").build());
        publisher.publish(TakePizzaPiece.builder().eventId(1l).username("jack").build());
   }
} 

Вывод в консоль должен быть таким

2022-09-03 00:08:31.309  INFO 5764 --- [    Test worker] r.a.es_cqrs.handlers.PizzaHandler        : Мистер john купил 2 кусков пиццы за 100 денег
2022-09-03 00:08:31.312  INFO 5764 --- [    Test worker] r.a.es_cqrs.handlers.PizzaHandler        : Пицца пицца у john пицца!!!
2022-09-03 00:08:31.315  INFO 5764 --- [    Test worker] r.a.es_cqrs.handlers.PizzaHandler        : jack взял кусочек
2022-09-03 00:08:31.316  INFO 5764 --- [    Test worker] r.a.es_cqrs.handlers.PizzaHandler        : jim взял кусочек
2022-09-03 00:08:31.316  INFO 5764 --- [    Test worker] r.a.es_cqrs.handlers.PizzaHandler        : а кусочек взять нельзя
PizzaDto(id=1, lastDate=2022-09-02T21:08:31.316115Z, username=john, price=100, piecePrice=50, pieceCount=2, borrowers={jack=50, jim=50})

Снапшоты

Последнее о чем расскажу - это снапшоты. Это промежуточные состояния объекта в рид модели. В какой-то момент может так получиться, что образуется большое количество событий в системе и чтобы собрать по ним объект придется тратить большое количество времени. Или если часто читать этот объект каждый раз он будет пересобираться. Для того чтобы избежать этого можно сохранять промежуточные состояния объекта. Это и называется снапшотом.

У всех снапшотов должны присутствовать идентификатор цепочки событий, по которым он до этого строился и дата и время последнего события которое к нему было применено.

Пусть все рид объекты имплементируют интерфейс ApplyingObject, где I - тип идентификатора.

public interface ApplyingObject<I> {
   I getId();
   Instant getLastDate();
}

Теперь создадим новый абстрактный класс SnapshotApplierContainer наследник класса ApplierContainer. Еще раз перегрузим функцию apply. Этот класс должен уметь собирать объект по его eventId. В функции apply будем находить объект, дособирать его до актуального состояния, сохранять и возвращать его из функции. За получение и сохранение объектов будут отвечать наследники класса, а для получения событий будем использовать CommandService.

@Component
public abstract class SnapshotApplierContainer<T extends ApplyingObject, I> extends ApplierContainer<T> implements ApplicationListener<TakeSnapshotApplicationEvent> {

    @Autowired
    private CommandService<I> commandService;

    protected abstract T getApplyingObjectById(I id);

    protected abstract void saveApplyingObject(T applyingObject);

    public T apply(I id) {
        T applyingObject = getApplyingObjectById(id);
        List<? extends Command> commands = applyingObject == null ?
                commandService.getCommandsById(id) :
                commandService.getCommandsByIdAndDate(id, applyingObject.getLastDate());

        if (!commands.isEmpty()) {
            applyingObject = apply(applyingObject, commands);
            saveApplyingObject(applyingObject);
            return applyingObject;
        } else {
            return applyingObject;
        }
    }
}
public interface CommandService<I> {
    List<? extends Command> getCommandsByIdAndDate(I id, Instant lastAppliedEventCreateDate);
    List<? extends Command> getCommandsById(I id);
}

И вот уже при чтении объекта он будет сохраняться. Теперь чтобы прочитать объект второй раз достаточно будет к нему применить только те события, которые произошли между чтениями. Но это нас не спасет от случая, когда накопилось 10М событий и кто-то решил прочитать этот объект (какой-то богачи накупил кучу пиццы). Для того чтобы вызывать обновления снапшота воспользуемся событиями спринга. Создадим событие TakeSnapshotApplicationEvent, содержащее идентификатор цепочки, которую нужно зафиксировать в снапшоте.

public class TakeSnapshotApplicationEvent extends ApplicationEvent {
    @Getter
    private final Long id;

    public TakeSnapshotApplicationEvent(Object source, Long id) {
        super(source);
        this.id = id;
    }
}

В сервисе SnapshotServiceImpl будем публиковать этот ивент, а в каждом аплаере будем получать его и по id обновлять снэпшоты. Для этого SnapshotApplierContainer должен имплементировать интерфейс ApplicationListener, а за получение ивента будет отвечать функция onApplicationEvent

@Component
@RequiredArgsConstructor
public class SnapshotServiceImpl implements SnapshotService {

    private final ApplicationEventPublisher applicationEventPublisher;

    @Override
    public void takeSnapshot(Long id) {
        applicationEventPublisher.publishEvent(new TakeSnapshotApplicationEvent(this, id));
    }
}
 @Override
    public void onApplicationEvent(TakeSnapshotApplicationEvent event) {
        apply((I) event.getId());
    }

Теперь можно в реализовать какую-то логику обновления снапшотов, например раз в сутки проверять есть ли цепочки с большим количеством элементов чем 20. Но для демонстрации подойдет и просто вызов в тесте.

@SpringBootTest
class EsCqrsApplicationTests {

   @Autowired
   private Publisher publisher;
   @Autowired
   private PizzaService pizzaService;
   @Autowired
   private SnapshotService snapshotService;
   @Autowired
   private PizzaRepository pizzaRepository;
   @Test
   void contextLoads() {
       publisher.publish(BuyPizza.builder()
               .eventId(1l)
               .price(100)
               .countOfPieces(2)
               .username("john").build());
       publisher.publish(TakePizzaPiece.builder().eventId(1l).username("jack").build());

System.out.println(pizzaRepository.findById(1L));
snapshotService.takeSnapshot(1l);
System.out.println(pizzaRepository.findById(1L));

       publisher.publish(TakePizzaPiece.builder().eventId(1l).username("jim").build());
       publisher.publish(TakePizzaPiece.builder().eventId(1l).username("jack").build());
       System.out.println(pizzaService.getById(1l));
   }
}

Заключение

В итоге у нас получился модуль для более легкого использования архитектуры Event sourcing + cqrs. На рабочем проекте он не подошел бы, так как использование рефлексии замедлило бы операции над командами, а сервис высоконагруженный. При этом задача оказалась интересной и модуль можно применить в невысоконагруженной системе.
Если интересно, вот телеграмм

Комментарии (4)


  1. rinat_crone
    21.09.2022 16:36

    Публиковать команды чаще всего требуется публиковать в конце выполнения логики, так как если возникнет ошибка, то ни одна команда не должна была быть опубликована.

    А что будем делать, если не удалось опубликовать сообщение в брокер после завершенной локальной транзакции?


    1. AbrekUS
      21.09.2022 16:58
      +1

      Будет «упс...» и придется прикручивать transactional outbox pattern.


  1. LeshaRB
    21.09.2022 20:29
    +1

    if (command.getUsername()!=null && command.getUsername().isBlank())

    throw new RuntimeException("имя должно быть заполнено")

    Почему не использовать стандартный валидатор?


  1. Drunik
    22.09.2022 09:21
    +1

    Кажется у вас 3 ошибки в слове write - как-то режет глаз.