Уверен, что многие, кто интересовался подходами к разработке микросервисной архитектуры, знакомы с трудами Криса Ричардсона на эту тему и уже встречали transactional outbox pattern. А для тех, кто не знаком, кратко расскажу основную идею:

В микросервисной архитектуре не редка ситуация коммуникации микросервисов посредством публикации событий. Происходит изменение состояния какой-либо сущности в одном из микросервисов и для информирования других сервисов публикуется событие об изменение данной сущности. Проблема кроется в том, что эти два действия - сохранение состояния и публикация события - обычно не происходят атомарно. А это означает, что на любом из указанных этапов что-то может пойти не так, и одно из действий не будет выполнено, что оставит систему в несогласованном состоянии. В качестве решения предлагается в начале в рамках одной транзакции атомарно сохранять и изменения и само событие, а вторым шагом выполнять публикацию ранее сохраненного события.

Перед тем, как изобретать свой велосипед, я, разумеется, ознакомился с чужими. И все они мне показались решениями "в лоб". Везде над методом навешивается аннотация Transactional и в самом методе сначала сохраняется сущность, а следом происходит сохранение события, связанного с ней.

@Transactional
public SaleId createWidget(Sale sale) {
  var saleId = saleRepository.save(sale);
  messageQueue.saveMessage(StockReductionEvent.of(sale.item(), sale.amount()));
  return saleId;
}

Вариант вполне себе рабочий, но можно и лучше.

Используемый стек

Представленные примеры реализованы на языке Kotlin с использованием Spring, базы данных MongoDB. Но все тоже самое может быть легко реализовано и с использованием реляционной модели.

Сам Крис Ричардсон, рассказывая о Transactional outbox pattern, оперирует понятиями из DDD, такими как агрегат и доменное событие. Я же решил пойти еще немного дальше и реализовать данную интеграцию с использованием DDD в более явной форме.

Пару слов о DDD. А точнее о тактических шаблонах DDD.

Объект-значение (Value Object) - объект, для которого не важна своя идентичность, и один объект может быть легко заменен на другой такой же объект с такими же свойствами. В качестве примера обычно приводят деньги. Money - характеризуется валютой и своим достоинством. Купюра 1000 рублей абсолютно эквивалентна для кассира любой другой тысяча рублевой купюре. В Kotlin объект-значение может быть представлен классом данных.

Сущность (Entity) - в качестве противопоставления объекту-значению идет сущность - то что мы хотим идентифицировать и отличать от других подобных объектов. Опять же пример с деньгами: если мы уже не кассиры, для которых важен только наминал купюры, а допустим криминалисты, имеющие дело с фальшивыми купюрами. Тогда нас может заинтересовать не только номинал, но еще и серийный номер конкретной купюры. Для сравнения объектов сущности используется не набор ее свойств, а выполняется сравнение только индентификаторов, а свойства могут изменяться в процессе жизненного цикла объекта.

Entity
abstract class Entity<ID : Serializable>(val id: ID) {

    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (other == null || this::class != other::class) return false

        val casted = other as Entity<*>

        if (id != casted.id) return false

        return true
    }

    override fun hashCode() = id.hashCode()
}

Сущность характеризуется только своей идентичностью (id)

Доменное событие (Domain Event) - важное для предметной области и для бизнеса событие об изменение состояния сущности.

DomainEvent
abstract class DomainEvent<A : RootAggregate<A, ID>, ID : Serializable, BODY : Any>(
    val id: Id,
    val type: String,
    val aggregate: String,
    val aggregateId: ID,
    val occurredOn: ZonedDateTime,
    val body: BODY?
) {
    constructor(aggregate: A, type: String, body: BODY? = null) : this(
        Id(),
        "${aggregate.aggregateName}.$type",
        aggregate.aggregateName,
        aggregate.id,
        ZonedDateTime.now(),
        body
    )

    data class Id(val id: UUID? = UUID.randomUUID())


    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (other == null || this::class != other::class)
            return false

        val casted = this::class.cast(other)

        if (id != casted.id) return false

        return true
    }

    override fun hashCode() = id.hashCode()

    override fun toString(): String =
        "${this::class.simpleName}(id=$id, type='$type', aggregate='$aggregate', aggregateId=$aggregateId, occurredOn=$occurredOn, body=$body)"

}

Агрегат (Aggregate) - рассматривается как совокупность сущностей и объектов значений. В качестве примера можно взять пост и комментарии к нему: и то и то могут быть сущностями, однако комментарий не может жить отдельно от поста. Поэтому можно сказать, что пост является корнем агрегата. В первом же приближении можно считать, что сущность и агрегат - это одно и тоже. Особенно данное утверждение может быть верно, если мы более склонны к проектированию мелкокластреных агрегатов только с одной сущностью. Технически же предполагается, что репозиторий существует только для агрегата, но никак не для сущности, т.е. любые операции по персистентности могут производиться только через агрегат.

Как было сказано выше, доменное событие непосредственно связано с изменениями в какой-либо сущности, например, "был добавлен новый комментарий". Так как доступ ко всем сущностям агрегата осуществляется через сам агрегат, то вполне логично, что владельцем доменного события также является агрегат.

RootAggregate
abstract class RootAggregate<A : RootAggregate<A, ID>, ID : Serializable>(id: ID) : Entity<ID>(id) {
    companion object {
        private val camelRegex = "(?<=[a-zA-Z])[A-Z]".toRegex()
    }

    private fun String.camelToSnakeCase(): String {
        return camelRegex.replace(this) { "_${it.value}" }.lowercase(Locale.getDefault())
    }

    @Transient
    val aggregateName = this::class.simpleName!!.camelToSnakeCase()

    @Transient
    private val _events = mutableListOf<DomainEvent<A, ID, *>>()
    val events: List<DomainEvent<A, ID, *>>
        get() = _events.toList()

    protected fun addEvent(event: DomainEvent<A, ID, *>) {
        _events.add(event)
    }

    fun clearEvents() {
        _events.clear()
    }
}

Таким образом одновременно при сохранении самого агрегата из него могут извлекаться и сохраняться так же и доменные события.

DomainRepository
interface DomainRepository<A : RootAggregate<A, ID>, ID : Serializable> {
    fun save(aggregate: A): A

    fun saveAll(aggregates: Iterable<A>): List<A>

    fun findById(id: ID): A?
  
    fun deleteById(id: ID)
    
    fun delete(aggregate: A)
}

Наличие специально выделенного DomainRepository позволяет отслеживать сохранение агрегатов и применять к этому процессу дополнительную логику по организации сохранения объектов в рамках одной транзакции. Тут и приходит на помощь BeanPostProcessor, который заменит исходный объект на прокси с новой сквозной функциональностью.

TransactionalOutboxBeanPostProcessor
import org.aopalliance.intercept.MethodInterceptor
import org.springframework.aop.framework.ProxyFactory
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.BeanFactoryAware
import org.springframework.beans.factory.config.BeanPostProcessor
import org.springframework.core.Ordered
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.transaction.support.TransactionTemplate
import pro.korobovn.domaintypes.DomainRepository
import pro.korobovn.domaintypes.RootAggregate


class TransactionalOutboxBeanPostProcessor : BeanPostProcessor, Ordered, BeanFactoryAware {

    private lateinit var transactionTemplate: TransactionTemplate

    private lateinit var mongoTemplate: MongoTemplate

    private lateinit var domainEventRepository: DomainEventRepository

    override fun postProcessBeforeInitialization(bean: Any, beanName: String): Any = bean

    override fun postProcessAfterInitialization(bean: Any, beanName: String): Any =
        if (bean is DomainRepository<*, *>) createTransactionalProxy(bean) else bean

    private fun createTransactionalProxy(bean: Any) =
        ProxyFactory(bean)
            .apply {
                addAdvice(saveInterceptor)
                addAdvice(saveAllInterceptor)
            }.proxy

    private val saveInterceptor = MethodInterceptor { invocation ->
        if (
            invocation.method.name == "save" &&
            RootAggregate::class.java.isAssignableFrom(invocation.method.returnType) &&
            invocation.arguments.firstOrNull() is RootAggregate<*, *> &&
            (invocation.arguments.firstOrNull() as RootAggregate<*, *>).events.isNotEmpty()
        ) {
            val aggregate = invocation.arguments.firstOrNull() as RootAggregate<*, *>
            transactionTemplate.execute {
                domainEventRepository.insertAll(aggregate.events)
                invocation.proceed()
            }
        } else invocation.proceed()
    }

    private val saveAllInterceptor = MethodInterceptor { invocation ->
        if (
            invocation.method.name == "saveAll" &&
            List::class.java.isAssignableFrom(invocation.method.returnType) &&
            invocation.arguments.firstOrNull() is Iterable<*>
        ) {
            @Suppress("UNCHECKED_CAST")
            val aggregates = invocation.arguments.firstOrNull() as Iterable<RootAggregate<*, *>>
            val events = aggregates.flatMap(RootAggregate<*, *>::events).toList()
            if (events.isNotEmpty())
                transactionTemplate.execute {
                    domainEventRepository.insertAll(events)
                    invocation.proceed()
                }
            else invocation.proceed()
        } else invocation.proceed()
    }

    override fun getOrder(): Int = Ordered.LOWEST_PRECEDENCE

    override fun setBeanFactory(beanFactory: BeanFactory) {
        this.transactionTemplate = beanFactory.getBean(TransactionTemplate::class.java)
        this.mongoTemplate = beanFactory.getBean(DomainEventRepository::class.java)
    }

}

TransactionalOutboxBeanPostProcessor отслеживает бины, которые реализуют DomainRepository, создает для таких бинов прокси, которые в свою очередь добавляют запуск транзакции при сохранении самого агрегата.

Публикация

За публикацию событий из базы данных отвечает отдельный процесс, который отслеживает изменения и выполняет публикацию. Данная задача может быть реализована с применением различных технологий, например, для монго можно использовать Mongo Change Streams - это аналог WAL (Write-Ahead Logging) у реляционных баз данных. Или же, в более простом варианте, изменения из базы могут запрашиваться через определенный интервал времени. В качестве брокера у нас используется RabbitMQ, но опять же ничего не мешает использовать любую другую технологию. При публикации события ожидается подтверждение успешной доставки сообщения до RabbitMQ и только после этого удаляется из очереди отправленное событие. Только потом уже выполняется публикация следующих сообщений. Все это обеспечивает строгий порядок публикации событий.

Итоги

В итоге наша программная модель превращается в нечто подобное:

Агрегат
@Document
class Cart(id: Id = Id()) : RootAggregate<Cart, Cart.Id>(id) {

    data class Id(val id: UUID = UUID.randomUUID()) : Serializable

    data class Item(val name: String, val quantity: Quantity = Quantity.ONE)

    private var _items: MutableList<Item> = mutableListOf()

    val items: List<Item>
        get() = _items.toList()

    fun add(item: Item) {
        _items.add(item)
        addEvent(CartWasChangedEvent.itemWasAdded(this, item))
    }


    override fun toString(): String {
        return "Cart(id=$id, items=$items)"
    }

}

Тут мы видим, что при изменении корзины сам агрегат фактически размещает addEvent событие об изменение своего состояния CartWasChangedEvent. Формально выполнили простой перенос размещения события со слоя сервисов на слой доменной логики. Эти изменения делают доменную модель менее анемичной, агрегат самостоятельно отвечает за соблюдение своих инвариантов о необходимости размещение события при изменении, не полагаясь на уровень сервисов.

CartWasChangedEvent
class CartWasChangedEvent : DomainEvent<Cart, Cart.Id, CartWasChangedEvent.Body> {
    enum class ChangeType {
        ADDED, REMOVED
    }

    data class Body(val type: ChangeType, val item: Cart.Item)

    @PersistenceCreator
    private constructor(
        id: Id,
        type: String,
        aggregate: String,
        aggregateId: Cart.Id,
        occurredOn: ZonedDateTime,
        body: Body
    ) : super(id, type, aggregate, aggregateId, occurredOn, body)

    constructor(aggregate: Cart, body: Body) : super(aggregate, "changed.added", body)

    companion object {
        fun itemWasAdded(cart: Cart, item: Cart.Item) = CartWasChangedEvent(cart, Body(ChangeType.ADDED, item))
    }
}

У события имеется два констуктора. Первый конструктор для восстановления объекта из базы данных - приватный, вызывается только фреймворком. Второй конструктор - пользовательский.

На уровне сервиса все происходит крайне незаметно для разработчика: выполняется только сохранение агрегата, все остальное происходит без дополнительного шаблонного кода. Логическая сложность того, о чем должен помнить разработчик, снижается.

Дислайк

На мой взгляд, основным недостатком предложенного подхода, как и его основным преимуществом, является тесная интеграция с концепцией и программной моделью DDD. Но, к сожалению, DDD не является серебряной пулей и подходит не к любому кейсу. Например, я сталкивался с ситуацией, когда происходило изменение состояния (маппинг) и требовалось опубликовать сообщение. По сути это было не доменное событие, а команда для другого сервиса на запуск дальнейшей обработки. При желании можно было бы "натянуть" концепцию DDD и на описанный кейс: ввести новый агрегат, которого нет фактически в домене, создать новое событие связанное с этим агрегатом, и затем в другом сервисе отслеживать это событие и должным образом реагировать на него. Но все это выглядело крайне притянутым за уши. По этой причине при разработке данного кейса было решено отказаться от DDD и пойти более традиционным путем, который оказался даже проще и выглядел более эстетично.

Комментарии (9)


  1. aleksandy
    20.06.2023 04:16

    Насколько я понял, получается, что мы сначала отправляем событие, получаем OK, что оно опубликовано, а уже потом сохраняем в БД запись, так?

    А если сохранение в БД отвалится, а событие уже улетело? По-моему, более правильным подходом является отправка событий через @TransactionalEventListener(BEFORE_COMMIT) и если отправка не удалась, то и транзакцию ещё не поздно зафейлить.


    1. korobovn Автор
      20.06.2023 04:16

      Не так, агрегат содержит в себе событие и при сохранении агрегата сохраняется и он сам и событие. И все это происходит в рамках одной транзакции, атомарно. А затем уже выполняется публикация.

      Что касается варианта с TransactionalEventListener(BEFORE_COMMIT) то может быть ситуация, когда уже событие опубликовалось, а транзакция по какой-либо причине завершилась с ошибкой, что привело к ее откату и опять же может возникнуть несогласованное состояние. Плюс сам брокер может быть какое-то время недоступен, получается из-за его недоступности будет блокироваться и выполнение самой транзакции. А если публикация будет асинхронной опять же нет достоверной уверенности а прошла ли она корректно.
      Ваш вариант с TransactionalEventListener можно задействовать, чтобы не публиковать событие, а выполнять их сохранение с Transactional(propagation = Propagation.SUPPORTS) тогда и сущность и событие сохранятся в рамках одной транзакции.

      К слову, есть другая интересная аннотация DomainEvents, которая фактически автоматом публикует доменные события, она может быть использована вместе с Transactional для метода save у репозитория, при сохранение самой сущности и с EventListener и Propagation.SUPPORTS для сохранения в рамках одной транзакции самого события. Такое решение будет максимально близким к тому, о чем говорилось в статье.


      1. AlexViolin
        20.06.2023 04:16

        Если бизнес-данные и события хранятся в разных базах данных и их нельзя поместить в одну транзакцию - как в этом случае построить алгоритм публикации события?


        1. korobovn Автор
          20.06.2023 04:16

          При каких обстоятельствах может возникнуть такая ситуация? Агрегатом владеет какой-то микросервис, у этого микросервиса имеется база данных. И весь этот алгоритм применяется в рамках этой базы данных. Сохраняется агрегат и событие в одну и ту же базу данных.


          1. AlexViolin
            20.06.2023 04:16

            Абсолютно стандартная ситуация - продуктовая бд на оракле, а бд на которой можно реализовать Transactional outbox pattern на постгре.


            1. korobovn Автор
              20.06.2023 04:16

              а что мешает все в одной делать?


              1. AlexViolin
                20.06.2023 04:16

                Оракл - продуктовая бд заказчика и к ней имеем доступ в рамках бизнес-логики системы. Добавление новых схем и таблиц в неё невозможно. Постгре - внутренняя бд нашей системы и с ней можно делать всё что угодно.


        1. aleksandy
          20.06.2023 04:16

          Гарантированно атомарный - никак.

          С костылями и приседаниями

          Обмазывать сущности состоянием и, сохраняя бизнес-сущность, проставлять им статус "сохранено без события". При сохранении события ставить ей статус "сохранено", а у сущности менять статус на "сохранено с событием". После успешной публикации, менять статус события на "опубликовано".

          Плюс должен вертеться фоновый процесс, следящий за соответствием статусов сущностей и событий, и как-то (тут зависит от конкретного случая) реагировать на такие ситуации.


          1. korobovn Автор
            20.06.2023 04:16

            А в качестве гарантий атомарности, чем не устроили транзакции?)
            Ваша схема напоминает шаблон Сага для распределенных систем.
            А в данном примере вполне может быть использована локальная транзация и для сохранения сущности и для для сохранения события