
Введение
Всем привет! Рад видеть вас снова.
В этой части нашего цикла про Kafka разберём одну из самых хитрых тем — Kafka транзакции. Мы посмотрим, что они вообще гарантируют, когда их реально стоит применять, и почему во многих системах они не просто не нужны, а даже вредны.
Забегая вперёд, скажу: добавлять транзакции в нашу платформу из прошлой статьи мы не будем. Потому что они там не нужны. А почему — обсудим ниже.
Синтаксис и примеры мы рассмотрим на абстрактных сценариях — там, где транзакции действительно нужны.
Цель статьи — не просто научить вас включать транзакции, а помочь понять, когда они действительно спасают, а когда лишь добавляют сложности без всякой пользы.
Когда нужны Kafka транзакции?
Сначала разберёмся с тем, в каких сценариях Kafka транзакции действительно имеют смысл. Таких случаев немного, и все они связаны с требованиями к атомарности.
Первый сценарий — запись сообщений сразу в несколько топиков, когда частичная отправка недопустима. Если одно сообщение будет опубликовано, а другое — нет, система окажется в некорректном состоянии.
Если сервис одновременно является и консьюмером, и продюсером, то при перезапуске он повторит обработку сообщения, так как упал до коммита оффсета. В результате:
ранее отправленные сообщения будут отправлены повторно;
неотправленные сообщения будут отправлены впервые.
Такой сценарий можно частично компенсировать дедупликацией downstream-сервисов (тех сервисов, которые реагируют на отправленные сообщения), но это не всегда возможно.
Если же сервис не является консьюмером (например, реагирует на HTTP-запросы), то ситуация ещё хуже: сообщение, которое не было отправлено до сбоя, будет просто потеряно.
В этом сценарии Kafka транзакции гарантируют, что либо все сообщения станут видимы для консьюмеров, либо ни одно из них. Речь именно о видимости: при откате транзакции сообщения не удаляются физически из брокера, а остаются невидимыми для консьюмеров с read_committed параметром (о нём ниже).
Второй сценарий — сервисы, работающие по схеме read → process → write.
Сервис читает сообщения из одного или нескольких топиков, обрабатывает их и публикует результат в другие топики. При сбое после отправки сообщений, но до коммита оффсета, downstream-сервисы увидят результат обработки, а исходное сообщение будет обработано повторно. Это приводит к дубликатам.
Использование Kafka транзакций в этом сценарии позволяет связать чтение сообщений, запись результатов и коммит оффсетов в одну атомарную операцию.
Конкретные случаи
Ниже указаны конкретные практические случаи, в которых использование Kafka транзакций действительно оправдано. Во всех этих случаях частичный результат либо приводит к логической ошибке, либо не может быть корректно обработан downstream-сервисами.
-
Сервис публикует сообщения в несколько топиков, а downstream-сервисы не поддерживают дедупликацию.
В этом случае частичная публикация приводит к некорректному состоянию системы: один сервис обработает событие, другой — нет. Если повторная отправка недопустима, транзакции становятся единственным способом обеспечить атомарность.
-
Сервис хранит состояние в Kafka и публикует связанные события в разные топики.
Например, сервис сначала отправляет событие обработки, а затем — событие в топик истории с указанием времени отправления события. Если сервис упадёт между этими отправками, то при повторной обработке сообщение будет отправлено заново, а событие в истории — с другим временным значением. В результате история окажется некорректной.
-
Сервис работает по схеме
read -> process -> write, при этом downstream-сервисы не поддерживают дедупликацию.Если сервис упадёт после отправки сообщений, но до коммита оффсета, downstream-сервисы увидят те же самые сообщения повторно. В сценариях, где повторная обработка недопустима, Kafka транзакции позволяют связать отправку сообщений и коммит оффсетов в одну атомарную операцию.
Границы Kafka транзакций
Очень важно отметить, что Kafka транзакции работают только в рамках Kafka. Они не умеют откатывать изменения во внешних системах — будь то запись в базу данных, вызов API или отправка email.
По сути, Kafka транзакции решают две задачи:
Атомарная публикация — связывают несколько отправок сообщений продюсером в одну операцию «всё или ничего»
Атомарность чтения-записи — для сервисов, которые одновременно являются консьюмерами и продюсерами, связывают отправку сообщений и коммит оффсетов в единый шаг
За пределами Kafka их гарантии заканчиваются. Если вам нужно согласованно обновить базу данных и отправить сообщение в Kafka — стандартные Kafka транзакции вам не помогут. Для таких сценариев нужны другие подходы (например, паттерн Outbox, который был рассмотрен в данной статье).
Как работают Kafka транзакции?
Давайте теперь вообще рассмотрим, как работают транзакции.
Я недаром выше делил случаи использования транзакций на 2 сценария. Ведь в 1 сценарии сервис может быть чистым продюсером, а во втором сценарии он обязательно и продюсер, и консьюмер. Транзакции будут работать немного по-разному.
Если имеем дело с 1 сценарием и сервис является только продюсером, то транзакция работает как-то так:
beginTransaction()
send(topicA, msg1)
send(topicB, msg2)
send(topicC, msg3)
commitTransaction()
Это очень похоже на работу транзакций в базах данных.
Если же мы имеем дело со 2 сценарием или 1 сценарием, при этом сервис является и консьюмером, то транзакция будет выглядеть уже сложнее:
beginTransaction()
poll()
process()
send(topicA, msg1)
send(topicB, msg2)
sendOffsetsToTransaction()
commitTransaction()
То есть оффсеты не коммитятся сразу, а становятся частью транзакции. Падение до обработки оффсетов повлечёт отмену транзакции.
Включение Kafka транзакций в Spring Boot (настройка application.properties файлов)
Настройка продюсера
Чтобы включить транзакции, нам необходимо прописать в application.properties продюсера следующие строчки:
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.transactional.id=some-service-${random.value}-
В данной статье мы говорили подробно о transactional-id параметре. Здесь же мы используем не id целиком, а префикс. Spring Boot сам поставит суффикс. При этом суффикс будет всё время инкрементироваться, что позволяет достичь уникальности. Ещё важно уточнить, что мы подставляем в префикс некоторое случайное значение. Ниже поясню, зачем.
Настройка консьюмера
Чтобы консьюмеры не видели сообщения из откатившихся транзакций, нужно установить соответствующий уровень изоляции:
spring.kafka.consumer.isolation-level=read_committed
Без этой настройки консьюмеры с уровнем read_uncommitted (по умолчанию) будут видеть все сообщения, включая те, что были отправлены в транзакциях, но потом отменены. Это нарушит логику транзакций.
Почему подставляем в префикс случайное значение?
Чтобы понять, зачем нам random.value, нужно разобраться, как Spring Boot управляет продюсерами с транзакциями.
Без транзакций: один продюсер на всех
Когда транзакции выключены, ProducerFactory создаёт единственный KafkaProducer, который безопасно используется всеми потоками через KafkaTemplate. Нет транзакционного состояния — нет проблем.
С транзакциями: пул продюсеров
С включением транзакций один продюсер не подходит — он не может параллельно обрабатывать несколько транзакций. Поэтому ProducerFactory переходит в транзакционный режим и создаёт пул продюсеров.
Каждый из таких продюсеров получает свой transactional.id. В нашем случае мы задаём его в виде префикса c помощью следующей строчки:
spring.kafka.producer.properties.transactional.id=some-service-${random.value}-
Spring Boot использует этот префикс и добавляет к нему суффикс, формируя уникальные transactional.id для каждого продюсера в пуле.
Проблема: несколько инстансов приложения
Что если у вас запущено несколько инстансов одного сервиса? Без случайной части в префиксе их transactional.id начнут совпадать:
Инстанс 1:
some-service-0,some-service-1Инстанс 2:
some-service-0,some-service-1
Kafka отслеживает продюсеров не только по transactional.id, но и по epoch (счётчику поколений). Если появляется второй продюсер с тем же transactional.id, Kafka "заблокирует" продюсера со старым epoch — все его сообщения будут отвергнуты.
В результате часть ваших инстансов перестанет отправлять сообщения.
Именно поэтому в префикс добавляется ${random.value}. Оно гарантирует, что для каждого инстанса приложения transactional.id будет уникальным, а значит, Kafka не будет блокировать продюсеров из-за конфликтов по epoch.
Про epoch мы также говорили в данной статье.
Включение Kafka транзакций в Spring Boot (Java код)
Обязательно надо добавить бин, который будет управлять транзакциями:
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory
) {
return new KafkaTransactionManager<>(producerFactory);
}
Теперь можно создать метод, работающий с Kafka транзакциями:
@Transactional
public void publishOrderResult(OrderRequest orderRequest) {
String orderId = UUID.randomUUID().toString();
OrderPlacedEvent orderPlacedEvent = new OrderPlacedEvent(
orderId,
orderRequest.email(),
orderRequest.productName(),
orderRequest.quantity()
);
kafkaTemplate.send("order-placed", orderPlacedEvent.orderId(), orderPlacedEvent);
kafkaTemplate.send("order-audit", orderPlacedEvent.orderId(), orderPlacedEvent);
}
Если у консьюмеров этих топиков не настроена дедупликация, то использовать транзакции необходимо.
Как вы можете видеть, мы включаем транзакции с помощью известной аннотации Transactional.
Как это работает внутри?
Давайте копнём чуть глубже и посмотрим, причём тут бин, который мы ранее создавали.
При входе в метод:
KafkaTransactionManagerзапрашивает продюсера уProducerFactoryПродюсер сохраняется в
TransactionSynchronizationManager(хранилище, привязанное к текущему потоку)KafkaTemplateберёт продюсера именно из этого хранилищаПри успешном выполнении метода транзакция коммитится, при исключении — откатывается
Без транзакций KafkaTemplate работает напрямую с ProducerFactory.
Важный момент: несколько TransactionManager
Если в приложении только один TransactionManager, то проблем нет. Но если их несколько (например, ещё и для базы данных), нужно указать, какой именно использовать:
Вариант 1: Пометить бин как @Primary
@Primary
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(...) { ... }
Вариант 2: Указать имя в аннотации
@Transactional("kafkaTransactionManager")
public void publishOrderResult(...) { ... }
Почему в нашей платформе не нужны транзакции?
Для начала напомню архитектуру:

Давайте возьмем вышеперечисленные сценарии и применим их к нашей платформе:
-
Запись в несколько топиков
У нас нет сервисов, которые пишут в несколько топиков сразу
Сценарий
read -> process -> writeУ нас есть только один сервис, который читает, обрабатывает и публикует. Однако нам совершенно не обязательно, чтобы эти действия были атомарны. У нас на стороне downstream-services (analytics-service и notification-service) настроена дедупликация. Так что нам не страшен сценарий падения до коммита оффсета
Вообще как вы могли догадаться, транзакции — не бесплатный механизм. Добавляются некоторые накладные расходы.
В нашем случае транзакции не просто избыточны — они были бы вредны, добавляя сложность без какой-либо пользы.
Именно поэтому большинство систем обходятся без Kafka транзакций. Это узкоспециализированный инструмент для конкретных сценариев, а не "маст-хэв" для каждой event-driven системы.
Заключение
Теперь вы знаете, что такое Kafka транзакции, как их настроить в Spring Boot и, что важнее, в каких сценариях они действительно нужны.
Хотя это не самая популярная технология (большинство систем прекрасно обходятся без неё), понимать её принципы работы важно.
В следующей статье подробно рассмотрим обработку ошибок и Dead Letter Queues. Посмотрим, что делать, когда сообщения не могут быть обработаны
До скорых встреч!