Здравствуйте. Меня зовут Юрий Кехтер, я backend-разработчик на Python.

В этой статье я хотел бы рассказать об архитектурных шаблонах Transactional Outbox и Idempotent Consumer. Кроме того, я хотел бы показать собственную реализацию, содержащую интересное сочетание технологий, выходящее за рамки этих шаблонов, значительно упрощающее реализацию и эксплуатацию:

  • Альтернатива поллингу (polling) при публикации событий.

  • Автоматическое разделение работы по публикации событий.

  • Автоматическое удаление устаревших данных.

  • Возможность запуска в одном процессе с HTTP-сервером.

Я предпочитаю не смешивать разные языки (languages), поэтому постараюсь указывать в скобках название термина на английском, чтобы избежать разночтений.

Библиотека event-outbox написана на Python в порыве энтузиазма, во время "вынужденной паузы" на моей текущей работе. По состоянию на июнь 2024 года, она еще не добралась до версии 1.0.0 и ни разу не использовалась по настоящему. К тому же, ей не хватает полноценной документации.

Код в статье предоставлен исключительно для демонстрации и отличается от исходного кода библиотеки. Я использую синтаксис ... (ellispis), чтобы опустить некоторые несущественные детали, но предоставить достаточно контекста для понимания кода.

Если Вы по какой-то причине решите использовать эту библиотеку, прошу вас связаться со мной, ведь я заинтересован в её развитии. Если Вы чувствуете непреодолимое желание помочь проекту за идею, то приглашаю вас объединить усилия в open-source.

Проблема

Требуется гарантировать итоговую согласованность данных (eventual consistency) при выполнении двух упорядоченных действий на двух разных сервисах в распределенной системе. Между действиями допустима задержка. Общение между сервисами происходит по нестабильной сети. Сервисы могут падать.

Если пренебречь условием "на разных сервисах", то речь идет о монолите (monolith), а решение очевидно. Действия происходят один за другим, изменения данных накапливаются в транзакции. После фиксации транзакции (commit), согласованные изменения записываются в базу данных. Допустимость задержки между действиями игнорируется. Проблема нестабильной сети обходится стороной. Транзакция спасает целостность данных от отказа сервиса в процессе обработки запроса.

Если все же речь идет о двух разных сервисах, то для гарантии целостности (consistency) данных, проблемы нестабильной сети и падения сервисов игнорировать не получится.

Обозначим порядок обработки:

  1. Первый сервис получает запрос (request) от клиента.

  2. Первый сервис выполняет первое действие и публикует событие, инициирующее выполнение второго действия на втором сервисе.

  3. Первый сервис отправляет ответ (response) клиенту.

  4. Второй сервис получает событие и выполняет второе действие.

Коммуникация между клиентом и первым сервисом выходит за рамки решаемой проблемы. Считаем, что запрос (request) все-таки пришел на первый сервис, а ответ (response) все-таки будет доставлен клиенту.

Когда первый сервис выполняет действие и публикует событие, возможны несколько точек отказа:

  • Событие не опубликовано. Если изменения в базе данных зафиксированы (commit) до публикации события, то отсутствует гарантия публикации. Второй сервис или даже система обмена сообщениями могут быть недоступны. Как итог, второе действие может остаться невыполненным.

  • Опубликовано лишнее событие. Если событие опубликовано до фиксирования (commit) изменений в базе данных, то не гарантируется изменение данных. База данных может оказаться недоступной. Как итог, фиксация результатов первого действия может не произойти.

Когда второй сервис получает событие и выполняет второе действие, возможны несколько точек отказа:

  • Событие не доставлено. Второй сервис не получил событие. Если отсутствует гарантия доставки события, то второе действие может остаться невыполненным.

  • Событие обработано неоднократно. Есть система доставки событий с несколькими попытками (retries), требующая от сервиса подтверждения (ack). Если второй сервис выполнил второе действие, но не смог оповестить такую систему об успешной обработке, то может быть предпринята еще одна попытка. Как итог, событие может быть обработано несколько раз.

В любом из этих случаев согласованность данных не гарантируется.

Решение

Примечание:
У проблемы существует несколько решений. Например Two-Phase Commit Protocol. В рамках этой статьи будет рассмотрено альтернативное решение - шаблоны Transactional Outbox и Idempotent Consumer.

В идеальном мире, от механизма доставки и обработки событий требуется:

  • Гарантия публикации события ровно один раз (exactly once).

  • Гарантия доставки события ровно один раз (exactly once).

  • Гарантия обработки события ровно один раз (exactly once).

Мне не известен способ реализации этих гарантий в настоящем мире. Однако, можно использовать другую систему гарантий:

  • Гарантия публикации события хотя бы один раз (at least once)

  • Гарантия доставки события хотя бы один раз (at least once)

  • Гарантия обработки события хотя бы один раз (at least once)

  • Гарантия идемпотентной обработки события (idempotency)

Даже если событие будет опубликовано 100 раз, доставлено 50 раз, а обработано 25 раз, то идемпотентность обработки события (idempotency) позволит гарантировать итоговую согласованность (eventual consistency).

За гарантию итоговой согласованности придется заплатить:

  • В случае отказов сети, сервисов или транзакций, вычислительные ресурсы могут быть потрачены впустую, ведь возможно выполнение работы, вообще не влияющей на состояние системы.

  • Гарантии "хотя бы один раз" (at least once) требуют механизма повторов (retries).

  • Иногда требуется реализовать компенсирующую транзакцию, которая отменит (rollback) первое действие на первом сервисе. Во время допустимой задержки между действиями, второе действие может стать невыполнимым в принципе.

Далее будет рассмотрен механизм доставки, оформленный в виде библиотеки event-outbox.

Технологии

Библиотека имеет ряд конкретных зависимостей и пока не позиционируется как универсальное решение для любого стека. Я попытался взять от используемых технологий максимум, чтобы обеспечить наибольшую полезность при наименьших затратах собственного времени.

MongoDB

MongoDB - это основная СУБД, с которой я работаю последние несколько лет. Некоторые её особенности напрямую повлияли на конечное решение:

  • Transactions - гарантирует атомарную согласованность (transactional consistency) при изменении документов в разных коллекциях.

  • Change Streams - позволяет подписаться на изменения в коллекции и снизить нагрузку на базу данных.

  • Partial TTL indexes - позволяет автоматически удалять опубликованные и обработанные события из базы данных.

Для подключения к базе данных используется асинхронный драйвер motor.

Apache Kafka

Apache Kafka - платформа потоковой передачи событий, с которой я не работал (по состоянию на июнь 2024), но изучал ради интереса. Некоторые её особенности также повлияли на конечное решение:

  • Consumer Groups - позволяет организовать независимую обработку одних и тех же событий в разных группах.

  • Consumer Rebalance Protocol - выдает консюмеру (consumer) эксклюзивные права на обработку событий из партишнов топика (topic partitions) в рамках одной группы консюмеров (consumer group) и автоматическое перераспределение при их подключении / отключении.

  • Manual Offset Management - позволяет гарантировать обработку события хотя бы один раз (at least once) за счет фиксирования смещения (offset commit) непосредственно после обработки события.

  • Custom Partitioner - позволяет использовать собственный алгоритм выбора партишна (partition), в который публикуется событие.

Для публикации и потребления (consume) событий используется асинхронный клиент aiokafka.

Pydantic

Библиотека pydantic используется для описания модели данных (data model) публикуемых событий.

Публикация событий

Transactional Outbox

Transactional Outbox - шаблон, гарантирующий публикацию событий хотя бы один раз (at least once). Суть: отделить намерение (intent) от публикации события.

При выполнении действия, в одной транзакции оказываются:

  • Изменения данных, т.е. результат выполнения действия.

  • Намерения опубликовать события.

После успешной фиксации (commit) такой транзакции, в отдельной коллекции базы данных надежно хранятся намерения опубликовать события. Если произойдет отказ транзакции (abort), то такие намерения просто не будут зафиксированы.

Для публикации событий запускается специальный цикл, читающий документы из коллекции с намерениями и публикующий их в систему обмена сообщениями. После подтверждения публикации, событие помечается опубликованными и больше не публикуется.

В случае отказа системы обмена сообщениями (Kafka), событие останется неопубликованным. Попытки опубликовать событие будут предприниматься до тех пор, пока система обмена сообщениями не станет доступна и не подтвердит публикацию события.

В случае отказа сети, система обмена сообщениями (Kafka) не сможет подтвердить публикацию. При следующей попытке публикации, в системе обмена сообщениями может появиться дубликат события.

При остановке цикла, публикующего события, публикация останавливается до тех пор, пока цикл не будет перезапущен. Так как после публикации, события помечаются в базе данных (MongoDB) опубликованными, цикл может восстановить работу с первого неопубликованного события.

За гарантии доставки и целостность данных придется заплатить увеличением нагрузки на базу данных. Её можно немного уменьшить, задействовав специальные механизмы слежения за изменениями.

Сохранение намерений в базу данных

Начнем с кода:

from contextlib import AbstractAsyncContextManager

from motor.motor_asyncio import AsyncIOMotorClientSession
from pydantic import BaseModel


class Event(BaseModel):
    ...


class EventListener:
    def event_occurred(self, event: Event) -> None:
        ...


class EventOutbox:
    def event_listener(
        self, mongo_session: AsyncIOMotorClientSession
    ) -> AbstractAsyncContextManager[EventListener]:
        ...

Интерфейс EventListener - это синхронный слушатель событий. Он объявляет единственный метод event_occurred, который принимает event - произошедшее событие.

Класс EventOutbox используется как менеджер контекста (context manager) таких слушателей. С помощью метода event_listener можно асинхронно открыть контекст, передав сессию (session) базы данных:

from motor.motor_asyncio import AsyncIOMotorClient

from event_outbox import Event, EventOutbox

async def handler(
    mongo_client: AsyncIOMotorClient,
    outbox: EventOutbox,
) -> None:
    db = mongo_client.get_default_database()
    async with await mongo_client.start_session() as session:
        async with outbox.event_listener(session) as listener:
            await db["collection"].insert_one({}, session=session)
            listener.event_occurred(
                Event(
                    topic="bounded_context",
                    content_schema="EventOccurred",
                )
            )

Сессия базы данных нужна для того, чтобы намерения опубликовать события попали в одну транзакцию с результатом выполнения действия. Таким образом, контекстный менеджер (context manager) EventOutbox.event_listener сам открывает и фиксирует (commit) транзакцию.

Класс Event - это модель данных pydantic. Она используется для представления (serialize/dump) события в json и передачи по сети. Новый класс событий предполагается объявлять наследником класса Event и описывать в нем все данные события:

from typing import Literal

from event_outbox import Event

class EventOccurred(Event):
    topic: Literal["bounded_context"] = "bounded_context"
    content_schema: Literal["EventOccurred"] = "EventOccurred"
    extra_data: int

Изменения вставляются в коллекцию пачкой (insert many) при выходе из контекста:

from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Mapping

from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection

outbox: AsyncIOMotorCollection = ...


class EventListener:
    ...


class EventOutbox:
    mongo_outbox: AsyncIOMotorCollection

    @asynccontextmanager
    async def event_listener(
        self, mongo_session: AsyncIOMotorClientSession
    ) -> AsyncIterator[EventListener]:
        documents: list[Mapping[str, Any]] = ...
        listener: EventListener = ...
        async with self.mongo_session.start_transaction():
          yield listener
          await self.mongo_outbox.insert_many(
              documents,
              session=mongo_session,
          )

Чтение намерений из базы данных

При запуске или перезапуске цикла публикации, все накопившиеся события будут последовательно прочитаны через обычный find и опубликованы.

Когда документов в коллекции не остается, необходимо каким-то образом подождать появления новых. Классическое решение - организовать поллинг (polling) коллекции. Change Streams позволяют реализовать альтернативу поллингу (polling) и снизить нагрузку на базу данных. Можно подписаться на операции insert в коллекции и ждать, когда MongoDB сама оповестит о новом документе.

from typing import Any, AsyncIterator, Mapping

from bson import Timestamp
from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection


class EventPublisher:
    mongo_session: AsyncIOMotorClientSession
    mongo_outbox: AsyncIOMotorCollection

    async def subscribe_to_change_stream(
        self, start_at_operation_time: Timestamp
    ) -> AsyncIterator[Mapping[str, Any]]:
        async with self.mongo_outbox.watch(
            [{"$match": {"operationType": {"$in": ["insert"]}}}],
            start_at_operation_time=start_at_operation_time,
            session=self.mongo_session,
        ) as change_stream:
            async for change_event in change_stream:
                yield change_event["fullDocument"]

Эксклюзивные права на публикацию

Consumer Rebalance Protocol позволяет Kafka автоматически назначать (assign) консюмерам (consumer) партишны топиков (topic partition) таким образом, чтобы из партишна одновременно читал только один консюмер группы.

Что именно было назначено консюмеру (consumer), можно узнать во время выполнения:

from aiokafka import AIOKafkaConsumer

kafka_consumer: AIOKafkaConsumer = ...
assignment = kafka_consumer.assignment()

Если консюмер (consumer) и продюсер (producer) запускаются в одном процессе, то способность Kafka назначать партишны консюмерам (consumer) может быть использована для иной цели - предоставления продюсерам (producer) эксклюзивного права на публикацию в партишны (partition):

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from pydantic import BaseModel


class Event(BaseModel):
    topic: str
    partition_key: int


class EventPublisher:
    kafka_consumer: AIOKafkaConsumer
    kafka_producer: AIOKafkaProducer

    async def publish_event(self, event: Event) -> None:
        partition = event.partition_key % len(
            self.kafka_consumer.partitions_for_topic(event.topic)
        )
        assignment = self.kafka_consumer.assignment()
        if any(
            topic_partition.partition == partition
            for topic_partition in assignment
            if topic_partition.topic == event.topic
        ):
            await self.kafka_producer.send_and_wait(
                event.topic,
                event.model_dump_json().encode(),
                partition=partition,
            )

Таким образом, существует возможность запустить несколько параллельных процессов публикации событий и использовать Kafka для разделения работы между ними. Каждый из продюсеров (producuer) публикует события только в выделенные ему партишны топика (topic partitions).

Доставка событий

При попадании события в кластер, доставка хотя бы один раз (at least once) гарантируется самой Kafka.

Обработка событий

Idempotent Consumer

Idempotent Consumer - шаблон, гарантирующий идемпотентность (idempotency) обработки событий. Суть: зафиксировать (commit) факт обработки вместе с результатом обработки.

Когда событие поступает из сети, оно сохраняется в базу данных. Выполняется обработка. В одну транзакцию попадают:

  • Изменение флага у входящего события.

  • Изменения данных, т.е. результат обработки.

  • Намерения опубликовать другие события.

После фиксации (commit) такой транзакции, событие считается обработанным и больше не обрабатывается.

Если по какой-то причине произошла одновременная обработка одного и того же события, то будет зафиксирована только одна транзакция, а вторая будет отменена (abort).

Если необходимо выполнить запрос во внешнюю систему, то рекомендуется передать ключ идемпотентности. Тогда повторная обработка событий гарантировано не приведет к повторному выполнению действия во внешней системе.

Обработчик событий

Для начала рассмотрим интерфейс обработчика событий - протокол EventHandler:

from typing import Protocol

from motor.motor_asyncio import AsyncIOMotorClientSession


class Event:
    topic: str
    content_schema: str


class EventOutbox:
    ...


class EventHandler(Protocol):
    async def __call__(
        self,
        event: Event,
        mongo_session: AsyncIOMotorClientSession,
        /,
    ) -> None:
        pass

С ним совместима функция, принимающая 2 аргумента - обрабатываемое событие и сессию базы данных с открытой транзакцией, в которой необходимо выполнять запросы.

Событие приходит в обработчик как экземпляр класса Event. Библиотека не реализует маршрутизацию (routing) по типам событий. Специального механизма внедрения зависимостей в обработчик тоже нет. Для внедрения экземпляра EventOutbox или любых других зависимостей, можно написать небольшой lambda-адаптер:

from motor.motor_asyncio import AsyncIOMotorClientSession

from event_outbox import Event, EventHandler, EventOutbox


async def event_handler(
    event: Event,
    session: AsyncIOMotorClientSession,
    outbox: EventOutbox,
    answer: int,
) -> None:
    match (event.topic, event.content_schema):
        case ("bounded_context", "EventOccurred"):
            ...


def create_adapter() -> EventHandler:
    outbox: EventOutbox = ...
    return lambda event, session: (
        event_hander(
            event,
            session,
            outbox,
            answer=42,
        )
    )

Эксклюзивные права на обработку

Consumer Rebalance Protocol выдает консюмеру (consumer) в группе эксклюзивные права на обработку событий из назначенных ему партишнов топиков (topic partitions).

Сохранение входящего события в базу данных

Пришедшее из Kafka событие сохраняется в специальную коллекцию входящих событий. MongoDB позволяет использовать словарь в качестве идентификатора, чтобы воспользоваться встроенным (default) уникальным индексом на _id для обеспечения уникальности по нескольким полям.

from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection
from pydantic import BaseModel


class Event(BaseModel):
    topic: str
    content_schema: str
    idempotency_key: str


class EventConsumer:
    mongo_inbox: AsyncIOMotorCollection
    mongo_session: AsyncIOMotorClientSession

    async def handle_events(self) -> None:
        while True:
            event: Event = ...
            document_id = event.model_dump(
                mode="json",
                include={"topic", "content_schema", "idempotency_key"},
            )
            await self.mongo_inbox.insert_one(
                {"_id": document_id, "handled": False},
                session=self.mongo_session,
            )
            ...

Оптимистическая блокировка

Перед обработкой события открывается транзакция, в которой необработанное событие помечается обработанным. Фактически, это оптимистическая блокировка по полю handled. При наличии нескольких конкурирующих транзакций, изменяющих флаг handled, зафиксирована (commit) будет только одна.

from datetime import UTC, datetime
from typing import Any, Mapping, Protocol

from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection


class Event:
    ...


class EventHandler(Protocol):
    async def __call__(
        self,
        event: Event,
        mongo_session: AsyncIOMotorClientSession,
        /,
    ) -> None:
        pass


class EventConsumer:
    mongo_session: AsyncIOMotorClientSession
    mongo_inbox: AsyncIOMotorCollection
    event_handler: EventHandler

    async def handle_event(self, document_id: Mapping[str, Any], event: Event) -> None:
        async with self.mongo_session.start_transaction():
            result = await self.mongo_inbox.update_one(
                {"_id": document_id, "handled": False},  # <-- Here
                {
                    "$set": {
                        "handled": True,
                        "handled_at": datetime.now(tz=UTC),
                    }
                },
                session=self.mongo_session,
            )
            if result.modified_count:
                await self.event_handler(event, self.mongo_session)

Идемпотентный запрос к внешней системе

Событие Event содержит ключ идемпотентности (idempotency key). Ключ генерируется при создании экземпляра Event как hex-представление UUID4. Этот ключ может использоваться для идемпотентных запросов к внешним системам:

from motor.motor_asyncio import AsyncIOMotorClientSession

from event_outbox import Event


async def send_email(idempotency_key: str) -> None:
    ...


async def event_handler(
    event: Event,
    session: AsyncIOMotorClientSession,
) -> None:
    await send_email(event.idempotency_key)

Подтверждение обработки

Manual Offset Management позволяет вручную управлять смещением (commit offset), чтобы зафиксировать в Kafka факт обработки события непосредственно после обработки. Для этого консюмер создается с флагом enable_auto_commit=False:

from aiokafka import AIOKafkaConsumer


class EventConsumer:
    kafka_consumer: AIOKafkaConsumer

    async def handle_events(self) -> None:
        while True:
            kafka_consumer_record = await self.kafka_consumer.getone()
            ...
            await self.kafka_consumer.commit()  # <-- Here


async def initialize() -> None:
    """
    Your service initialization code
    """
    topics: list[str] = ...
    bootstrap_servers: str = ...
    group_id: str = ...
    async with AIOKafkaConsumer(
        *topics,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id,
        auto_offset_reset="earliest",
        enable_auto_commit=False,  # <-- Here
    ) as kafka_consumer:
        ...

В случае перезапуска цикла обработки событий, цикл продолжит работу с первого необработанного события.

Удаление устаревших событий

Чтобы избежать повторной обработки, необходимо достаточно долго держать в базе данных информацию об обработанных событиях. В связи с этим, в базе данных накапливается большое количество устаревших документов. Для удаления устаревших данных используется Partial TTL indexes. Событие автоматически удаляется из базы данных после публикации или обработки, через заранее определенное время. Это позволяет переложить задачу очистки базы данных на MongoDB:

from datetime import timedelta

from motor.motor_asyncio import AsyncIOMotorCollection


class EventOutbox:
    mongo_outbox: AsyncIOMotorCollection
    mongo_inbox: AsyncIOMotorCollection

    async def create_indexes(self) -> None:
        await self.mongo_outbox.create_index(
            "published_at",
            name="expiration",
            partialFilterExpression={"published": True},
            expireAfterSeconds=timedelta(days=1).total_seconds(),
        )
        await self.mongo_inbox.create_index(
            "handled_at",
            name="expiration",
            partialFilterExpression={"handled": True},
            expireAfterSeconds=timedelta(days=1).total_seconds(),
        )

Инициализация и запуск

Асинхронные циклы публикации в Kafka и обработки событий из Kafka запускаются в одном процессе. Например, их можно запустить вместе с HTTP-фреймворком. Тогда приложение будет состоять из трех основных циклов:

  • Цикл обработки HTTP-запросов.

  • Цикл публикации событий.

  • Цикл идемпотентной обработки событий.

Например, при использовании HTTP-фреймворка FastAPI, инициализацию можно выполнить в lifespan:

from contextlib import asynccontextmanager
from typing import AsyncIterator

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorClientSession

from event_outbox import Event, EventOutbox


async def event_handler(
    event: Event,
    session: AsyncIOMotorClientSession,
    outbox: EventOutbox,
) -> None:
    ...


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    mongo_client: AsyncIOMotorClient = ...
    kafka_producer: AIOKafkaProducer = ...
    kafka_consumer: AIOKafkaConsumer = ...

    event_outbox = EventOutbox(
        mongo_client,
        kafka_producer,
        kafka_consumer,
    )
    await event_outbox.create_indexes()
    async with event_outbox.run_event_handler(
        lambda event, session: event_handler(
            event,
            session,
            event_outbox,
        )
    ):
        yield

def create_app() -> FastAPI:
    return FastAPI(lifespan=lifespan)

Заключение

Реализация итоговой согласованности (eventual consistency) за счет гарантий доставки и идемпотентной обработки - это мощный механизм, который часто ускользает из виду. Возможность запускать циклы обработки и публикации в одном процессе с HTTP-сервером позволяет интегрировать решение в проект без запуска дополнительных процессов (worker). Использование механизмов отслеживания изменений в базе для ожидания новых событий является альтернативой поллингу (polling) и снижает нагрузку на базу данных. Эксплуатирование механизма назначения партишнов (partition) между консюмерами (consumer) позволяет также разделить работу по публикации событий между продюсерами (producer). Автоматическое удаление устаревших событий из базы данных позволяет снизить затраты на хранение.

В целом, я доволен реализованным решением. Мне очень хотелось бы услышать мнение сообщества. Я первый раз в open-source. Буду рад, если проект окажется полезным.

Если после прочтения у вас остались вопросы, предлагаю ознакомиться с исходным кодом или обсудить в комментариях.

Спасибо за внимание!

Комментарии (20)


  1. bugy
    11.06.2024 04:57
    +1

    Отличная статья, спасибо! Мы делаем нечто похожее, только с activemq

    Я только одного момента не понял, когда слушатель получает событие, он сохраняет его в монго? А дальше что с ним происходит?

    И как вообще выглядит событие у вас? Если нетрудно, можете дать конкретный пример названия и обработчика.


    1. return_nullptr Автор
      11.06.2024 04:57
      +1

      У меня есть репозиторий с игрушечным демо. Вот, например, модель данных события OrderCreated:

      class OrderCreated(Event):
          topic: Literal["booking"] = "booking"
          content_schema: Literal["OrderCreated"] = "OrderCreated"
          order_id: str
          client_id: str

      Когда слушатель (consumer) получает событие, он не сразу подтверждает (commit offset) обработку в Kafka. Событие кладется в коллекцию MongoDB вот в таком виде:

      {
        "_id": {
          "topic": "booking",
          "content_schema": "OrderCreated",
          "idempotency_key": "acff8c3352d547d68f1c25a172c031d7"
        },
        "handled": false
      }

      Т.е. просто составной ключ и флаг, что событие еще не обрабатывалось.

      Далее идет обработка. Функцию обработчика можете посмотреть в исходном коде демо.

      Когда транзакция зафиксировалась (commit), то обработка события подтверждается (commit offset) в Kafka. Можете посмотреть это в исходном коде event-outbox.


    1. return_nullptr Автор
      11.06.2024 04:57

      Можете поделиться своим опытом? К сожалению, почти ничего не знаю про ActiveMQ. Почему вы выбрали именно его? Есть ли какие-то механизмы восстановления после сбоев в кластере? Буду признателен, если опишете ваш подход в небольшом комментарии или дадите ссылки на статьи.


      1. bugy
        11.06.2024 04:57
        +1

        Мы на single instance. Всё, что добралось до сервера, восстанавливать не нужно. У нас похожий подход при публикации с транзакциями:

        • евент пишется в БД одновременно с основной транзакцией

        • после окончания транзации, евент отправляется в activemq асинхронно

        • если отправка успешная, то евент удаляется из БД

        • если отправка не успешная, то периодически система отправляет все евенты из БД, которые там есть (и удаляет их после отправки)

        Почему activemq:

        • очень простой в обслуживании и настройке

        • retry/DLQ из коробки

        • количество подписчиков более динамическое, чем в Kafka (нет привязки к partition). Можно параллельно получать несколько евентов, нет никаких оффсетов

        • для кафки я находил статьи, где говорится о потере сообщений, это прям no go для нас

        Из его минусов:

        • он медленный, если говорить о десятках и сотнях тысяч сообщений в минуту

        • нельзя смотреть "лог" (то, что было в прошлом)

        • он не заточен под кластеризацию. Можно, но не очень эффективно

        В нашем случае, кстати, нет промежуточного сохранения события после его получения: обработчик получает сообщение и сразу его обрабатывает. Если обработка успешна, то делается message acknowledge и оно считается доставленным. Если обработка неуспешная, то делается ретрай. На этом этапе сообщение потеряться уже не может


  1. nin-jin
    11.06.2024 04:57
    +1

    Мне не известен способ реализации этих гарантий в настоящем мире.

    Теперь известен.


    1. bugy
      11.06.2024 04:57
      +1

      Мне все ещё неизвестен

      • Перед обработкой задачи вставляется небольшая задержка на синхронизацию базы.

      • Падение обработчика приводит к его перезапуску и продолжению обработки задачи.

      • Если задача не обработана за определённый срок, то любой обработчик может её перехватить на себя

      Второй пункт это и есть at least once. Если обработчик повторяет процедуру, это значит, что он взял задачу дважды. Если обработчик умный и умеет продолжать выполнение, а не начинать с нуля, это особенности его реализации.

      Первый и третий пункт это вообще гарантия того, что в случае задержек или быстрого исполнения, одну и ту же задачу могут схватить разные обработчики.


      1. nin-jin
        11.06.2024 04:57
        +1

        Это цитата из High Availability решения, которое допускает дублирование с некоторой регулируемой вероятностью. Я вам дал ссылку на следующий раздел - там вместо дублирования страдает доступность.


        1. bugy
          11.06.2024 04:57

          А, извините, эту "документацию" очень трудно читать.

          В Prevent Doubling:
          > Падение обработчика или отсутствие связи с ним приводит к остановке обработки лишь его задач.

          из описания мне не понятно, что случится, если обработчик начал обрабатывать задачу, а потом упал. Или он начал обрабатывать задачу, и с ним пропала связь. И тут кто-то решил делать ребалансировку.

          ПС я правильно понимаю, что эта чудо-БД написана вами?


          1. nin-jin
            11.06.2024 04:57

            Поднимется и продолжит её выполнять.

            Ребалансировка повлияет лишь на новые задачи. Старые надо будет пересоздать при необходимости.

            Агась, но она ещё в бете.


        1. return_nullptr Автор
          11.06.2024 04:57

          Мне тоже интересен случай, когда обработчик сделал работу, зафиксировал изменения в базе данных, а затем упал. Если другому обработчику придет то же самое событие, то семантика доставки все же "хотя бы раз" (at least once).

          Такая система мне нужна в том числе и для сценария, в котором базы данных разных сервисов изолированны. В том смысле, что я не могу положить событие в базу данных одного сервиса, а затем в другом сервисе прочитать его из той же базы для обработки.

          Скажите пожалуйста, используется ли в приведенном Вами решении какая-то система доставки сообщений? Или передача событий происходит через базу данных?


          1. nin-jin
            11.06.2024 04:57

            В том-то и дело, что тут нет никаких событий. Есть только состояние, которое реплицируется между всеми узлами. Каждый узел напрямую работает со своей локальной копией и ничего не знает про другие. Соответственно, если обработчик упал, то его надо поднять, и он продолжит работу исходя из своего текущего состояния.


            1. return_nullptr Автор
              11.06.2024 04:57

              Вы меня немного запутали. Я пишу про механизм доставки и обработки событий.

              В идеальном мире, от механизма доставки и обработки событий требуется:

              Гарантия публикации события ровно один раз (exactly once).

              Гарантия доставки события ровно один раз (exactly once).

              Гарантия обработки события ровно один раз (exactly once).

              Мне не известен способ реализации этих гарантий в настоящем мире.

              Но при этом, вы пишете, что:

              В том-то и дело, что тут нет никаких событий.

              В связи с этим у меня вопрос: как мы можем говорить о гарантиях механизма доставки и обработки событий, если нет никаких событий?

              Если вас не затруднит, можете описать вот этот процесс обработки запроса в предложенном подходе?

              Обозначим порядок обработки:

              Первый сервис получает запрос (request) от клиента.

              Первый сервис выполняет первое действие и публикует событие, инициирующее выполнение второго действия на втором сервисе.

              Первый сервис отправляет ответ (response) клиенту.

              Второй сервис получает событие и выполняет второе действие.


              1. nin-jin
                11.06.2024 04:57
                +1

                Вот именно, что вы думаете событиями, а задача ваша - не однократное выполнение обработчика события, а однократное выполнение неидемпотентной задачи. Тут нужно немного подняться над возьнёй с событиями, чтобы увидеть их не нужность для этой задачи. Собственно, чем Кафка и занимается - превращает событие в состояние, чтобы потом превратить состояние обратно в событие. Это лишнее звено, которое только всё усложняет, по сравнению с работой с состоянием изначально.

                1. Клиент создаёт задачу.

                2. Увидев новую задачу, первый сервис отмечается в ней.

                3. Первый сервис делает свои дела и обновляет задачу.

                4. Увидев задачу в нужном статусе, второй сервис отмечается в ней.

                5. Второй сервис делает свои дела и обновляет задачу.

                6. Всё это время клиент наблюдает за статусом задачи в реальном времени.


                1. return_nullptr Автор
                  11.06.2024 04:57

                  Спасибо, что поделились!Рассмотренный в статье подход позволяет свести задачу однократного выполнения неидемпотентной операции к задаче неоднократного выполнения идемпотентной операции. Возня с событиями, как мне кажется, это больше вопрос подхода. Как ни крути, нужно сделать вторую операцию. Будет ли это для этого поставлена некоторая задача (job/task) или отправлено событие (event), инициирующее действие, в целом, не так важно.

                  Получается, клиент, который создаёт задачу, записывает ее в общую базу данных. Обработчики тоже смотрят на эту базу данных и выполняют из нее задачи. При этом есть механизм разделения работы, чтобы они не делали одно и то же. Ваше решение предоставляет базу данных + этот механизм.

                  У меня другой стек. MongoDB не занимается оркестрацией задач. Собственно, для этого здесь Kafka, протокол ребалансировки которой мне не нужно изобретать.

                  Писать свою оркестрацию это всё-таки большая работа, мое Вам уважение.

                  Однако, вернёмся к вопросу терминологии. Думаю, ваши доводы меня не убедили. Я всё-таки не согласен с утверждением, что это семантика "ровно один раз" (exactly once). Падения обработчиков могут оставить какой-то сайд-эффект во внешней системе до изменения статуса задачи. Например, отправка письма клиенту. В таких условиях, все равно необходимо делать запрос к внешней системе с ключом идемпотентности, если не хочется случайно отправить письмо дважды. Думаю, семантика обработки здесь тоже "хотя бы один раз" (at least once).


                  1. nin-jin
                    11.06.2024 04:57

                    Ещё как важно. На событие надо вовремя подписаться, иначе ты его пропустишь. Его нельзя повторять, иначе обработка задублируется. Его очень легко потерять, просто упав во время его обработки или передачи. Именно поэтому вокруг событий накручены всякие кафки, да рафты, и всё равно вокруг них нужны доп костыли.

                    А с состояниями всё просто: актуальное значение всегда доступно, его пересылка идемпотентна, а потерять его сложно, ибо реплицируется на несколько узлов. Это на порядок более простая и надёжная архитектура.

                    Я говорю про более сложную проблему - exactly once для неидемпотентной задачи. Для идемпотентной задачи достаточно и at least once - тут нет дилеммы между доступностью и однократностью.


                    1. bugy
                      11.06.2024 04:57
                      +1

                      Можете привести чуть более конкретный пример задачи/проблемы/состояния/обработчика. Т.е. конкретный юз кейс из реального мира.


                      1. nin-jin
                        11.06.2024 04:57

                        Вы не верите в существование неидемпотентных API? Да практически в любом REST/GQL/etc API создание сущностей не идемпотентно.


                      1. bugy
                        11.06.2024 04:57

                        Как вы сделали такой вывод из моего сообщения? Я просто попросил привести реальный пример, чтобы понять, как ваша схема работает. К сожалению на том уровне абстракции, на котором вы описываете, детали совершенно не ясны.

                        ПС не очень понимаю как API связан с идемпотентностью. Вторая это про реализацию, можно любой тип АПИ сделать идемпонетным или любой сделать неидемпотентным.


                      1. nin-jin
                        11.06.2024 04:57

                        Ну так сделайте эти апи идемпотентными, пожалуйста: https://docs.github.com/en/rest/about-the-rest-api/comparing-githubs-rest-api-and-graphql-api


                      1. bugy
                        11.06.2024 04:57

                        Непонятно откуда столько агрессии, при игнорировании основного вопроса. Ну БД и дело ваше, удачи