Зачем нужен event sourcing?

Если идет работа с реальными людьми, то нужно сохранять все действия для последующего «разбора полетов». Для многих это основная бизнес‑цель event sourcing.

Если посмотреть в будущее, то, другими словами, примерно все бизнес‑приложения рано или позже полностью или частично перейдут на эту архитектуру. Почему тогда существующие решения массово не используют? Это все‑таки требует дополнительного проектирования и кода, но при постоянном увеличении требований к ПО это уже сейчас по идее окупается.

Есть дополнительные плюсы и минусы, но они вторичны (влияют на выбор реализации, а не концепции).

Стоит отметить, что это достаточно революционное отклонение от обычной архитектуры. Другими словами, потребуется дополнительное обучение программистов и новые лучшие практики. Для примера негативного опыта можно почитать, например, этот тред: https://news.ycombinator.com/item?id=13 339 972

В статье предполагается, что основы Event Sourcing уже известны. Их немного напомню, но не более того.

Основные проблемы

  • Eventual Consistency или старые данные возвращаются при чтении.

  • Как разбираться с ошибками, которые случились где‑то по середине обработки?

  • Медленное построение онлайн проекций (когда событий много в агрегате)

  • При хранении событий в обычной БД c автоинкрементом иногда теряются событий при обработке, т.к. БД под нагрузкой выделяет блоки автоинкрементных id, которые параллельно сохраняются, а обработчик событий обычно ищет по последним id.

Предлагаемое решение

Для решения этих проблем предполагается следующие изменение архитектуры относительно классической:

  • Вводится понятие «основной проекции» — это проекция, расположенная в той же БД, что и хранилище событий.

  • Основная проекция должна содержать все данные, необходимые для работы команд, встроенные проекторы основной БД и реакторов. Соответственно, при необходимости получения данных они берутся из основной проекции, а не из хранилища событий.

  • Хранилище событий используется для получения истории и при необходимости перестроения основной проекции (полное или частичное).

  • Команда, сохранение события и встроенные проекторы основной БД работают в одной транзакции.

  • Реакторы работают в отдельной транзакции.

  • Живые проекции не рекомендуются (только для специфичных случаев с учетом потенциального наличия задержки). Фактически нет такого понятия как класс агрегата или агрегат в памяти. Идет работа на уровне команд и событий, а агрегат как id только для транзакций и параллельности.

  • Если требуется ограниченная асинхронность, то можно прикрутить рассылку событий через NATS (или другую очередь).

Как решаются проблемы?

  • Нет eventual consistency, т.к. идет классическая работа с данными.

  • Обработка ошибок опять же классическая — если все пошло не так, то транзакция откатывается, а клиент получает сообщение об ошибке.

  • Нет медленной обработки событий при получении данных, т.к. для этого, в основном, используется основная проекция.

  • Нет проблемы с диапазонами автоинкрементов, т.к. при обработке событий мы не полагаемся на эти id.

Что теряется?

  • Полностью асинхронный подход (который потенциально облегчает масштабирование) — его можно применить, но уже частично.

  • Распределенность — ее нет из коробки, нужно думать отдельно.

  • Масштабирование — ее нет из коробки, нужно думать отдельно.

В целом, исходя из принципа простоты, они нам обычно не нужны. Если же все же нужны, то, вероятно, стоит использовать классический подход к event sourcing.

Кто‑то даже может сказать, что это совсем не Event sourcing, если у нас нет агрегата в памяти, который вычисляет свое состояние по событиям. Но у нас есть агрегат в базе данных, который создан по событиям (чем не память, только не оперативная). Так что это все‑таки Event sourcing, но я довольно сильно рад, что не классический, т.к. классический имеет слишком много ограничений для обычных приложений. Я бы сказал, что предлагаемый подход не event driven (EVA), но event sourcing.

Как итог, идет отказ от разделения базы событий и основной проекции. За счет этого решаются проблемы распределенности и асинхронности (их нет). При масштабировании решения я бы скорее использовал партиционирование решения, чем асинхронность и отдельные сервисы. Но, это, понятно, зависит от специфики решения: в разных случаях могут быть разные ответы.

С одной стороны, ничего сверхестественного в подходе нет. С другой, какого‑то описания и термина для такого подхода не встречал. Предлагаю использовать Inline event sourcing (встроенный event sourcing), т.к. он основывается на встроенной проекции.

Стартовый шаблон Event Sourcing приложения

Стартовый шаблон приложения Event sourcing: https://github.com/stepin/kotlin-event-sourcing-app

Данный репозиторий представляет из себя пример приложения, а не отдельный движок. Пока что у меня нет уверенности, что этот движок можно использовать "как есть" в других приложениях. При этом есть уверенность, что начав с этого шаблона, вполне реально развивать приложения.

Данный проект является извлечением общей части из одного из моих личных проектов. Это уже где-то 5ая версия движка (первая вообще была на Golang). При этом версия новая -- возможны какие-то шероховатости первое время.

Шаблон основывается на моем базовом шаблоне Котлин-приложений: https://github.com/stepin/kotlin-bootstrap-app

События

Начинаем с выявления событий и сущностей.

Допустим, у нас есть простая бизнес-сущность Пользователь:

data class User(
  displayName: String,
  firstName: String,
  seconfName: String,
  email: String
)

И мы хотим поддержать следующие сценарии (события):

  • регистрация пользователя

  • смена имени

  • удаление пользователя

Для простоты примера не будем обращать внимание на подтверждения и авторизацию.

Пример события регистрации пользователя:

data class UserRegistered(  
  val email: String,  
  val firstName: String?,  
  val secondName: String?,  
  val displayName: String,  
  override val accountGuid: AccountGuid,  
  override val aggregatorGuid: UserGuid = UUID.randomUUID(),  
  override val guid: EventGuid = UUID.randomUUID(),  
) : UserEvent(eventTypeVersion = 3)
  • 4 основных поля: email, firstName, secondName, displayName

  • guid самого событий (рандомный)

  • aggregator guid = user guid — вот это неудобно, что нет синонима, но можно привыкнуть (и указан typealias UserGuid)

  • account guid — движок рассчитан на мультиаккаунтовые приложения

  • data class — удобно. И еще удобнее, что UserEvent — sealed класс, можно такие конструкции делать:

when (val e = event as UserEvent) {  
  is UserMetaUpdated -> "updated $e"  
  is UserRegistered -> "user registered with id $id ${meta.createdAt} $e"  
  is UserRemoved -> "user ${e.email} deleted at ${meta.createdAt}"  
}

Базовый класс для событий агрегата User выглядит так:

sealed class UserEvent(  
  override val eventTypeVersion: Short = 0,  
) : DomainEvent {  
  override val aggregatorType: String  
    get() = "user"  
  
  override val eventType: String  
    get() = this.javaClass.simpleName  
  
  abstract override val aggregatorGuid: UserGuid  
}
  • реализуется интерфейс DomainEvent движка

  • выставляется typealias UserGuid для aggregatorGuid — необязательно, как документация

  • выставляется тип агрегата

  • выставляется тип события — автоматически берется имя класса события (например, UserRegistered)

  • выставляется версия события в 0 по умолчанию, но это значение событие может переопределить

По сути, от события движок требует 2 вещи:

  • реализации интерфейса DomainEvent

  • корректной сериализации и десериализации JSONB

Остальное на усмотрение разработчика. При этом базовый класс для всех событий агрегата считается хорошей практикой.

Про id/guid: в этом примере подразумевается, что команды работают с guid, а при необходимости join в SQL‑запросах используется id (т.к. быстрее).

Команда

У нас команда — это либо отдельный Spring‑сервис, либо метод внутри Spring‑сервиса. По сути единственный критичный момент — должен использоваться интерфейс EventStorePublisher для публикации событий, а остальное движок не ограничивает.

Команда регистрации:

@Service  
class RegisterUser(  
  private val store: EventStorePublisher,  
  private val userRepository: UserRepository,  
) {  
  data class Params(  
    val email: String,  
    val firstName: String?,  
    val secondName: String?,  
    val displayName: String?,  
  )  
  
  sealed class Response {  
    data class Created(val userGuid: UUID) : Response()  
    data class Error(val errorCode: ErrorCode) : Response()  
  }  
  
  suspend fun execute(params: Params): Response = with(params) {  
    val user = userRepository.findByEmail(email)  
    if (user != null) {  
      return Response.Error(ErrorCode.USER_ALREADY_REGISTERED)  
    }  
  
    val accountGuid = UUID.randomUUID()
    val userGuid = UUID.randomUUID()
  
    val userRegistered = UserRegistered(  
      accountGuid = accountGuid,
      aggregatorGuid = userGuid,  
      email = email,  
      firstName = firstName,  
      secondName = secondName,  
      displayName = displayName ?: calcDisplayName(email, firstName, secondName),  
    ) 
    store.publish(userRegistered)  
  
    val accountCreated = AccountCreated(  
      name = "Неизвестная компания",  
      accountGuid = accountGuid,  
      userGuid = userGuid,  
    )  
    store.publish(accountCreated)  
  
    return Response.Created(userGuid)
  }
}

Возвращаемые от команд значения зависят от бизнес-логики: могут ли быть бизнес-ошибки, нужно ли вернуть guid и т.п. В каких-то случаях может ничего не возвращаться.

Проекторы

Пример 2-х проекторов в одном классе:

@Service  
class UserProjector(  
  private val userRepository: UserRepository,  
  private val accountRepository: AccountRepository,  
) {  
  companion object : Logging  
  
  @Projector  
  suspend fun handleUserRegistered(e: UserRegistered, meta: EventMetadata) {  
    val account = accountRepository.findByGuid(e.accountGuid)  
  
    val u = UserEntity()  
    u.accountGuid = e.accountGuid  
    u.accountId = account?.id ?: 0  
    u.guid = e.aggregatorGuid  
    u.email = e.email  
    u.displayName = e.displayName  
    u.firstName = e.firstName  
    u.secondName = e.secondName  
    u.createdAt = meta.createdAt.toInstant(ZoneOffset.UTC)  
  
    val savedUser = userRepository.save(u)  
    logger.debug { "new user id: ${savedUser.id}" }  
  }  
  
  @Projector  
  suspend fun handleUserRemoved(e: UserRemoved) {  
    val user = getUser(e.aggregatorGuid)  
    userRepository.delete(user)  
  }  
  
  private suspend fun getUser(userGuid: UUID) = userRepository.findByGuid(userGuid)  
    ?: throw DomainException(ErrorCode.USER_NOT_FOUND)
}
  • метод проектора должен быть в Spring‑бине

  • должна быть аннотация @Projector

  • в классе может быть несколько методов — ограничений нет

  • первый аргумент — событие

  • второй (опционально) — метаданные события

  • метод должен быть suspend (в принципе, это ограничение можно снять, но сейчас так в движке, и не планирую использовать без suspend)

  • исключение в проекторе отменит сохранение события

Реакторы

@Service  
class UserRegisteredEmailReactor(  
  private val emailService: SendEmailService,  
) {  
  companion object : Logging  
  
  @Reactor  
  suspend fun handle(e: UserRegistered) {  
    emailService.sendEmailConfirmationEmail(e.displayName, e.email, e.aggregatorGuid.toString())  
  }  
}
  • метод проектора должен быть в Spring‑бине

  • должна быть аннотация @Reactor

  • в классе может быть несколько методов — ограничений нет

  • первый аргумент — событие

  • второй (опционально) — метаданные события

  • метод должен быть suspend (в принципе, это ограничение можно снять, но сейчас так в движке, и не планирую использовать без suspend)

  • исключение в реакторе НЕ отменит сохранение события и запуск других реакторов

Чтение данных

Чтение данных основной проекции — никаких ограничений, как обычно.

Так же доступно чтение событий:

interface EventStoreReader {
  fun <T : DomainEvent> findEventsSinceId(  
    eventIdFrom: Long,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  

  fun <T : DomainEvent> findEventsSinceGuid(  
    eventGuidFrom: UUID,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  
 
  fun <T : DomainEvent> findEventsSinceDate(  
    date: LocalDateTime,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  

  fun <T : DomainEvent> findEvents(  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>
}

Это API можно использовать для получения истории или для создания асинхронных проекций.

Потенциально можно написать и свое API чтения событий, в jOOQ все для этого есть.

Так же можно делать полную или частичную перегенерацию базы (аргументы старта приложения или кастомный код).

Пример получения истории (естественно, можно смешивать чтение из событий и из основной проекции, т.к. это все в даже одной базе):

@Service  
class DebugService(  
  private val eventStoreReader: EventStoreReader,  
) {  
  suspend fun getUserAudit(userGuid: UUID): List<String> {  
    return eventStoreReader.findEvents<UserEvent>("user", userGuid, maxBatchSize = 100)  
      .map { (id, event, meta) ->  
        when (event) {  
          is UserMetaUpdated -> "updated $event"  
          is UserRegistered -> "user registered with id $id ${meta.createdAt} $event"  
          is UserRemoved -> "user deleted at ${meta.createdAt}"  
        }  
    }  
  }
}

Тут в API немного некрасиво — нет связи «user» и UserEvent. Возможно, имеет смысл передавать базовый класс, но он абстрактный. Если у кого‑то есть идеи как лучше сделать API (без строчки «user» и без приведения «as UserEvent») — будут рад прочитать.

Ограничения

  • В данной реализации Event Bus не внедрен (для трансляции событий через какую‑нибудь Кафку или NATS), но ничего не мешает такое прикрутить, если кому‑нибудь будет нужно.

Итог

Кода немного больше за счет выделения отдельной абстракции — Событие. Так же время уходит на саму абстракцию — назвать, выделить поля и т. п.

Для CRUD получается больше кода, но круда не так много как может показаться — нужно приучить себя думать в событиях бизнес‑области, а не создать/удалить запись в таблице базы данных.

В целом, мне нравится, поэтому и решил поделиться с сообществом.

Еще раз (для удобства) ссылка на репо: https://github.com/stepin/kotlin‑event‑sourcing‑app

Комментарии (2)


  1. tsvettsih
    18.05.2023 21:17
    +1

    Вводя понятие основной проекции переизобретается велосипед снапшот - это построенный по первым N ивентам слепок агрегата. Тогда если в команде нужен агрегат, он строится не по всем ивентам, а начиная с N+1. Если снапшот обновлять после каждого нового ивента, то получится ровно то, что в статье называется основной проекцией.

    Кстати снапшот никак не влияет на eventual consistency. Не важно, строится в команде агрегат по всём ивентам с первого или с N+1, всё равно между добавлением ивентов в ивент стор и обновлением проекций для чтения будет лаг.


    1. IgorStepin Автор
      18.05.2023 21:17

      Спасибо за комментарий)

      Вводя понятие основной проекции переизобретается велосипед снапшот - это построенный по первым N ивентам слепок агрегата. Тогда если в команде нужен агрегат, он строится не по всем ивентам, а начиная с N+1. Если снапшот обновлять после каждого нового ивента, то получится ровно то, что в статье называется основной проекцией.

      Все-таки снапшот -- это обычно тоже событие. Соответственно, он сокращает количество проигрываемых событий при построении агрегата. Это же проекция, но с необходимой полнотой данных, чтобы не нужно было отдельно строить агрегат, и с единой транзакцией с событием, которая исключает eventual consistency.

      Кстати снапшот никак не влияет на eventual consistency. Не важно, строится в команде агрегат по всём ивентам с первого или с N+1, всё равно между добавлением ивентов в ивент стор и обновлением проекций для чтения будет лаг.

      Да, классический снапшот никак не влияет, но основная проекция -- не снапшот (в классическом смысле). Идет 1 транзакция на БД с созданием события и обновлением основной ("встроенная" даже более точное определение) проекции, так что это уже никак не отличается по консистентности от обычных (не event sourcing) систем.

      Опять же, я не считаю это каким-то своим изобретением -- нечто подобное наверняка есть и у других. Моя роль скорее подсветить этот вариант и показать пример реализации.