kafka
kafka

Введение

Всем привет! Рад видеть вас снова.

В этой части нашего цикла про Kafka разберём одну из самых хитрых тем — Kafka транзакции. Мы посмотрим, что они вообще гарантируют, когда их реально стоит применять, и почему во многих системах они не просто не нужны, а даже вредны.

Забегая вперёд, скажу: добавлять транзакции в нашу платформу из прошлой статьи мы не будем. Потому что они там не нужны. А почему — обсудим ниже.

Синтаксис и примеры мы рассмотрим на абстрактных сценариях — там, где транзакции действительно нужны.

Цель статьи — не просто научить вас включать транзакции, а помочь понять, когда они действительно спасают, а когда лишь добавляют сложности без всякой пользы.

Когда нужны Kafka транзакции?

Сначала разберёмся с тем, в каких сценариях Kafka транзакции действительно имеют смысл. Таких случаев немного, и все они связаны с требованиями к атомарности.

Первый сценарий — запись сообщений сразу в несколько топиков, когда частичная отправка недопустима. Если одно сообщение будет опубликовано, а другое — нет, система окажется в некорректном состоянии.

Если сервис одновременно является и консьюмером, и продюсером, то при перезапуске он повторит обработку сообщения, так как упал до коммита оффсета. В результате:

  • ранее отправленные сообщения будут отправлены повторно;

  • неотправленные сообщения будут отправлены впервые.

Такой сценарий можно частично компенсировать дедупликацией downstream-сервисов (тех сервисов, которые реагируют на отправленные сообщения), но это не всегда возможно.

Если же сервис не является консьюмером (например, реагирует на HTTP-запросы), то ситуация ещё хуже: сообщение, которое не было отправлено до сбоя, будет просто потеряно.

В этом сценарии Kafka транзакции гарантируют, что либо все сообщения станут видимы для консьюмеров, либо ни одно из них. Речь именно о видимости: при откате транзакции сообщения не удаляются физически из брокера, а остаются невидимыми для консьюмеров с read_committed параметром (о нём ниже).

Второй сценарий — сервисы, работающие по схеме read → process → write.

Сервис читает сообщения из одного или нескольких топиков, обрабатывает их и публикует результат в другие топики. При сбое после отправки сообщений, но до коммита оффсета, downstream-сервисы увидят результат обработки, а исходное сообщение будет обработано повторно. Это приводит к дубликатам.

Использование Kafka транзакций в этом сценарии позволяет связать чтение сообщений, запись результатов и коммит оффсетов в одну атомарную операцию.

Конкретные случаи

Ниже указаны конкретные практические случаи, в которых использование Kafka транзакций действительно оправдано. Во всех этих случаях частичный результат либо приводит к логической ошибке, либо не может быть корректно обработан downstream-сервисами.

  1. Сервис публикует сообщения в несколько топиков, а downstream-сервисы не поддерживают дедупликацию.

    В этом случае частичная публикация приводит к некорректному состоянию системы: один сервис обработает событие, другой — нет. Если повторная отправка недопустима, транзакции становятся единственным способом обеспечить атомарность.

  2. Сервис хранит состояние в Kafka и публикует связанные события в разные топики.

    Например, сервис сначала отправляет событие обработки, а затем — событие в топик истории с указанием времени отправления события. Если сервис упадёт между этими отправками, то при повторной обработке сообщение будет отправлено заново, а событие в истории — с другим временным значением. В результате история окажется некорректной.

  3. Сервис работает по схеме read -> process -> write, при этом downstream-сервисы не поддерживают дедупликацию.

    Если сервис упадёт после отправки сообщений, но до коммита оффсета, downstream-сервисы увидят те же самые сообщения повторно. В сценариях, где повторная обработка недопустима, Kafka транзакции позволяют связать отправку сообщений и коммит оффсетов в одну атомарную операцию.

Границы Kafka транзакций

Очень важно отметить, что Kafka транзакции работают только в рамках Kafka. Они не умеют откатывать изменения во внешних системах — будь то запись в базу данных, вызов API или отправка email.

По сути, Kafka транзакции решают две задачи:

  1. Атомарная публикация — связывают несколько отправок сообщений продюсером в одну операцию «всё или ничего»

  2. Атомарность чтения-записи — для сервисов, которые одновременно являются консьюмерами и продюсерами, связывают отправку сообщений и коммит оффсетов в единый шаг

За пределами Kafka их гарантии заканчиваются. Если вам нужно согласованно обновить базу данных и отправить сообщение в Kafka — стандартные Kafka транзакции вам не помогут. Для таких сценариев нужны другие подходы (например, паттерн Outbox, который был рассмотрен в данной статье).

Как работают Kafka транзакции?

Давайте теперь вообще рассмотрим, как работают транзакции.

Я недаром выше делил случаи использования транзакций на 2 сценария. Ведь в 1 сценарии сервис может быть чистым продюсером, а во втором сценарии он обязательно и продюсер, и консьюмер. Транзакции будут работать немного по-разному.

Если имеем дело с 1 сценарием и сервис является только продюсером, то транзакция работает как-то так:

beginTransaction()
send(topicA, msg1)
send(topicB, msg2)
send(topicC, msg3)
commitTransaction()

Это очень похоже на работу транзакций в базах данных.

Если же мы имеем дело со 2 сценарием или 1 сценарием, при этом сервис является и консьюмером, то транзакция будет выглядеть уже сложнее:

beginTransaction()
poll()
process()
send(topicA, msg1)
send(topicB, msg2)
sendOffsetsToTransaction()
commitTransaction()

То есть оффсеты не коммитятся сразу, а становятся частью транзакции. Падение до обработки оффсетов повлечёт отмену транзакции.

Включение Kafka транзакций в Spring Boot (настройка application.properties файлов)

Настройка продюсера

Чтобы включить транзакции, нам необходимо прописать в application.properties продюсера следующие строчки:

spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.transactional.id=some-service-${random.value}-

В данной статье мы говорили подробно о transactional-id параметре. Здесь же мы используем не id целиком, а префикс. Spring Boot сам поставит суффикс. При этом суффикс будет всё время инкрементироваться, что позволяет достичь уникальности. Ещё важно уточнить, что мы подставляем в префикс некоторое случайное значение. Ниже поясню, зачем.

Настройка консьюмера

Чтобы консьюмеры не видели сообщения из откатившихся транзакций, нужно установить соответствующий уровень изоляции:

spring.kafka.consumer.isolation-level=read_committed

Без этой настройки консьюмеры с уровнем read_uncommitted (по умолчанию) будут видеть все сообщения, включая те, что были отправлены в транзакциях, но потом отменены. Это нарушит логику транзакций.

Почему подставляем в префикс случайное значение?

Чтобы понять, зачем нам random.value, нужно разобраться, как Spring Boot управляет продюсерами с транзакциями.

Без транзакций: один продюсер на всех

Когда транзакции выключены, ProducerFactory создаёт единственный KafkaProducer, который безопасно используется всеми потоками через KafkaTemplate. Нет транзакционного состояния — нет проблем.

С транзакциями: пул продюсеров

С включением транзакций один продюсер не подходит — он не может параллельно обрабатывать несколько транзакций. Поэтому ProducerFactory переходит в транзакционный режим и создаёт пул продюсеров.

Каждый из таких продюсеров получает свой transactional.id. В нашем случае мы задаём его в виде префикса c помощью следующей строчки:

spring.kafka.producer.properties.transactional.id=some-service-${random.value}-

Spring Boot использует этот префикс и добавляет к нему суффикс, формируя уникальные transactional.id для каждого продюсера в пуле.

Проблема: несколько инстансов приложения

Что если у вас запущено несколько инстансов одного сервиса? Без случайной части в префиксе их transactional.id начнут совпадать:

  • Инстанс 1: some-service-0, some-service-1

  • Инстанс 2: some-service-0, some-service-1

Kafka отслеживает продюсеров не только по transactional.id, но и по epoch (счётчику поколений). Если появляется второй продюсер с тем же transactional.id, Kafka "заблокирует" продюсера со старым epoch — все его сообщения будут отвергнуты.

В результате часть ваших инстансов перестанет отправлять сообщения.

Именно поэтому в префикс добавляется ${random.value}. Оно гарантирует, что для каждого инстанса приложения transactional.id будет уникальным, а значит, Kafka не будет блокировать продюсеров из-за конфликтов по epoch.

Про epoch мы также говорили в данной статье.

Включение Kafka транзакций в Spring Boot (Java код)

Обязательно надо добавить бин, который будет управлять транзакциями:

@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
        ProducerFactory<String, Object> producerFactory
) {
    return new KafkaTransactionManager<>(producerFactory);
}

Теперь можно создать метод, работающий с Kafka транзакциями:

@Transactional
public void publishOrderResult(OrderRequest orderRequest) {
    String orderId = UUID.randomUUID().toString();
    OrderPlacedEvent orderPlacedEvent = new OrderPlacedEvent(
        orderId,
        orderRequest.email(),
        orderRequest.productName(),
        orderRequest.quantity()
    );
  
    kafkaTemplate.send("order-placed", orderPlacedEvent.orderId(), orderPlacedEvent);
    kafkaTemplate.send("order-audit", orderPlacedEvent.orderId(), orderPlacedEvent);
}

Если у консьюмеров этих топиков не настроена дедупликация, то использовать транзакции необходимо.

Как вы можете видеть, мы включаем транзакции с помощью известной аннотации Transactional.

Как это работает внутри?

Давайте копнём чуть глубже и посмотрим, причём тут бин, который мы ранее создавали.

При входе в метод:

  1. KafkaTransactionManager запрашивает продюсера у ProducerFactory

  2. Продюсер сохраняется в TransactionSynchronizationManager (хранилище, привязанное к текущему потоку)

  3. KafkaTemplate берёт продюсера именно из этого хранилища

  4. При успешном выполнении метода транзакция коммитится, при исключении — откатывается

Без транзакций KafkaTemplate работает напрямую с ProducerFactory.

Важный момент: несколько TransactionManager

Если в приложении только один TransactionManager, то проблем нет. Но если их несколько (например, ещё и для базы данных), нужно указать, какой именно использовать:

Вариант 1: Пометить бин как @Primary

@Primary
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(...) { ... }

Вариант 2: Указать имя в аннотации

@Transactional("kafkaTransactionManager")
public void publishOrderResult(...) { ... }

Почему в нашей платформе не нужны транзакции?

Для начала напомню архитектуру:

архитектура платформы
архитектура платформы

Давайте возьмем вышеперечисленные сценарии и применим их к нашей платформе:

  1. Запись в несколько топиков

    У нас нет сервисов, которые пишут в несколько топиков сразу

  2. Сценарий read -> process -> write У нас есть только один сервис, который читает, обрабатывает и публикует. Однако нам совершенно не обязательно, чтобы эти действия были атомарны. У нас на стороне downstream-services (analytics-service и notification-service) настроена дедупликация. Так что нам не страшен сценарий падения до коммита оффсета

Вообще как вы могли догадаться, транзакции — не бесплатный механизм. Добавляются некоторые накладные расходы.

В нашем случае транзакции не просто избыточны — они были бы вредны, добавляя сложность без какой-либо пользы.

Именно поэтому большинство систем обходятся без Kafka транзакций. Это узкоспециализированный инструмент для конкретных сценариев, а не "маст-хэв" для каждой event-driven системы.

Заключение

Теперь вы знаете, что такое Kafka транзакции, как их настроить в Spring Boot и, что важнее, в каких сценариях они действительно нужны.

Хотя это не самая популярная технология (большинство систем прекрасно обходятся без неё), понимать её принципы работы важно.

В следующей статье подробно рассмотрим обработку ошибок и Dead Letter Queues. Посмотрим, что делать, когда сообщения не могут быть обработаны

До скорых встреч!

Комментарии (0)