Apache Kafka — король асинхронного взаимодействия в микросервисных архитектурах. Но что если нужно получить ответ сразу? Перевод от команды Spring АйО шаг за шагом покажет, как превратить Kafka в инструмент синхронной коммуникации — с настройкой ReplyingKafkaTemplate
, топиками для ответа и тайм-аутами.
1. Обзор
Apache Kafka зарекомендовал себя как одна из самых популярных и широко используемых систем обмена сообщениями для event-driven архитектур. В таких архитектурах один микросервис публикует сообщение в топик, а другой микросервис асинхронно потребляет и обрабатывает его.
Тем не менее, в некоторых сценариях требуется немедленный ответ от микросервиса-publisher`а для продолжения дальнейшей обработки. Хотя Kafka изначально предназначена для асинхронного взаимодействия, её можно настроить для поддержки синхронной коммуникации по принципу запрос-ответ с использованием отдельных топиков.
В этой статье мы рассмотрим, как реализовать синхронную коммуникацию типа запрос-ответ в приложении Spring Boot с использованием Apache Kafka.
2. Настройка проекта
Для демонстрации мы смоделируем систему отправки уведомлений. Мы создадим одно приложение на Spring Boot, которое будет одновременно выступать в роли продюсера и консюмера.
2.1. Зависимости
Начнём с добавления зависимости Spring Kafka в файл pom.xml нашего проекта:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.4</version>
</dependency>
Эта зависимость предоставляет необходимые классы для установления соединения и взаимодействия с настроенным экземпляром Kafka.
2.2. Определение сообщений запроса и ответа
Далее определим два record
-класса, которые будут представлять сообщения запроса и ответа:
record NotificationDispatchRequest(String emailId, String content) {
}
public record NotificationDispatchResponse(UUID notificationId) {
}
Здесь record NotificationDispatchRequest
содержит emailId
и содержимое уведомления, а record NotificationDispatchResponse
включает уникальный notificationId
, который генерируется после обработки запроса.
2.3. Определение топиков Kafka и конфигурационных свойств
Теперь определим топики Kafka для запроса и ответа. Кроме того, настроим длительность тайм-аута для получения ответа от компонента-консюмера.
Эти свойства мы сохраним в файле application.yaml
нашего проекта и воспользуемся аннотацией @ConfigurationProperties для сопоставления значений с Java record`ом
, к которому смогут обращаться конфигурационный и сервисный уровни приложения:
@Validated
@ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous")
record SynchronousKafkaProperties(
@NotBlank
String requestTopic,
@NotBlank
String replyTopic,
@NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)
Duration replyTimeout
) {
}
Мы также добавили аннотации валидации, чтобы гарантировать корректную настройку всех необходимых свойств. Если какая-либо из проверок не будет пройдена, контекст приложения Spring ApplicationContext
не сможет подняться. Это позволяет нам придерживаться принципа fail-fast.
Ниже приведён фрагмент файла application.yaml
, в котором определены необходимые свойства. Эти свойства будут автоматически сопоставлены с record`ом
SynchronousKafkaProperties
:
com:
baeldung:
kafka:
synchronous:
request-topic: notification-dispatch-request
reply-topic: notification-dispatch-response
reply-timeout: 30s
Здесь мы настраиваем имена топиков Kafka для запроса и ответа, а также тайм-аут ожидания ответа, равный тридцати секундам.
Помимо наших пользовательских свойств, добавим также несколько основных конфигурационных параметров Kafka в файл application.yaml
:
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: synchronous-kafka-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.baeldung.kafka.synchronous
properties:
allow:
auto:
create:
topics: true
Прежде всего, чтобы наше приложение могло подключиться к настроенному экземпляру Kafka, мы указываем URL bootstrap-сервера, используя переменную окружения.
Далее мы настраиваем свойства сериализации и десериализации ключей и значений как для продюсера, так и для consumer`a. Кроме того, для consumer`a мы указываем group-id и задаём доверенный пакет, содержащий наши record`ы запроса и ответа, для корректной JSON-десериализации.
После настройки вышеуказанных свойств Spring Kafka автоматически создаёт для нас бины типов ConsumerFactory
и ProducerFactory
. Мы будем использовать их для определения дополнительных конфигурационных бинов Kafka в следующем разделе.
Наконец, мы включаем автоматическое создание топиков, чтобы Kafka могла создавать их при необходимости. Важно отметить, что данное свойство включено исключительно для целей демонстрации — в production приложениях так делать не следует.
2.4. Определение конфигурационных бинов Kafka
После настройки конфигурационных свойств давайте определим необходимые конфигурационные бины Kafka:
@Bean
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(
ConsumerFactory<String, NotificationDispatchResponse> consumerFactory
) {
String replyTopic = synchronousKafkaProperties.replyTopic();
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
Сначала мы внедряем экземпляр ConsumerFactory
и используем его вместе с настроенным replyTopic
для создания бина KafkaMessageListenerContainer
. Этот бин отвечает за создание контейнера, который слушает сообщения из нашего replyTopic
-а, то есть топика ответов.
Далее мы определим основной бин, который будет использоваться на уровне сервиса для реализации синхронного взаимодействия:
@Bean
ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(
ProducerFactory<String, NotificationDispatchRequest> producerFactory,
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer
) {
Duration replyTimeout = synchronousKafkaProperties.replyTimeout();
var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);
return replyingKafkaTemplate;
}
С использованием ProducerFactory
и ранее определённого бина KafkaMessageListenerContainer
мы создаём бин ReplyingKafkaTemplate
. Кроме того, с помощью внедрённого synchronousKafkaProperties
настраиваем тайм-аут ожидания ответа, заданный в файле application.yaml
. Этот параметр определяет, как долго наш сервис будет ждать ответное сообщение в replyTopic
перед срабатыванием тайм-аута.
Бин ReplyingKafkaTemplate
управляет взаимодействием между топиками запроса и ответа, обеспечивая возможность синхронной коммуникации через Kafka.
Наконец, определим бины, которые позволят нашему компоненту-consumer`у отправлять ответы обратно в топик ответов:
@Bean
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(
ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();
factory.setConsumerFactory(consumerFactory);
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
Комментарий от команды Spring АйО
Напомним, что в данном простом случае, одно и то же приложение как отправляет запрос в Kafka, так и посылает ответное сообщение на этот запрос в replyTopic.
Сначала мы создаём стандартный бин KafkaTemplate
, используя бин ProducerFactory
.
Затем, совместно с ConsumerFactory
, мы используем его для определения бина KafkaListenerContainerFactory
. Этот бин позволяет нашим компонентам-consumer
`ам, которые потребляют сообщения из топика запросов, отправлять ответное сообщение в топик ответов после завершения необходимой обработки.
3. Реализация синхронной коммуникации с использованием Kafka
После завершения настройки конфигурации давайте реализуем синхронную коммуникацию по принципу запрос-ответ между двумя настроенными топиками Kafka.
3.1. Отправка и получение сообщений с использованием ReplyingKafkaTemplate
Сначала создадим класс NotificationDispatchService
, который будет отправлять сообщения в настроенный топик запросов с помощью ранее определённого бина ReplyingKafkaTemplate
:
@Service
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
class NotificationDispatchService {
private final SynchronousKafkaProperties synchronousKafkaProperties;
private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;
// standard constructor
NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {
String requestTopic = synchronousKafkaProperties.requestTopic();
ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);
var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
return requestReplyFuture.get().value();
}
}
В методе dispatch()
мы используем autowired
экземпляр synchronousKafkaProperties
, чтобы получить значение requestTopic
, настроенное в файле application.yaml
. Затем, совместно с переданным в аргументе метода объектом notificationDispatchRequest
, мы создаём экземпляр ProducerRecord
.
Далее мы передаём созданный ProducerRecord
в метод sendAndReceive()
, чтобы опубликовать сообщение в топик запросов. Этот метод возвращает объект RequestReplyFuture
, который мы используем для ожидания ответа и последующего получения его значения.
Внутри, при вызове метода sendAndReceive()
, класс ReplyingKafkaTemplate
генерирует уникальный ID
корреляции — случайный UUID
— и добавляет его в заголовок исходящего Kafka сообщения. Также в заголовок добавляется имя топика для ответа, в который ожидается поступление ответного сообщения. Напомним, что мы уже настроили этот топик в бине KafkaMessageListenerContainer
.
Бин ReplyingKafkaTemplate
использует сгенерированный ID
корреляции в качестве ключа для сохранения объекта RequestReplyFuture
в потокобезопасной структуре ConcurrentHashMap
. Это обеспечивает корректную работу в многопоточной среде и поддержку параллельных запросов.
3.2. Определение consumer`а сообщений Kafka
Теперь, чтобы завершить реализацию, создадим компонент-consumer, который будет обрабатывать сообщения из настроенного топика запросов и отправлять ответ в топик ответов:
@Component
class NotificationDispatchListener {
@SendTo
@KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")
NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {
// ... processing logic
UUID notificationId = UUID.randomUUID();
return new NotificationDispatchResponse(notificationId);
}
}
Мы используем аннотацию @KafkaListener
для прослушивания топика запросов, указанного в файле application.yaml
.
Внутри метода listen()
мы просто возвращаем record NotificationDispatchResponse
, содержащий уникальный notificationId
.
Ключевой момент — аннотация @SendTo
, которой мы помечаем метод. Она сообщает Spring Kafka, что из заголовков входящего сообщения необходимо извлечь ID
корреляции и имя топика для ответа. Эти данные используются для автоматической отправки возвращаемого значения метода в соответствующий топик ответов с тем же ID
корреляции в заголовке сообщения.
Комментарий от команды Spring АйО
На самом деле, аннотация @SendTo
общая, и не имеет чёткой привязки к Spring Kafka проекту. Она также используется в проектах Spring Integration и в Spring Jms, например. То, как Spring обрабатывает @SendTo
, очень сильно зависит от технологии (Spring Kafka, Spring Jms и т.д.), от настроек среды окружения и т.д. Поэтому будьте с ней аккуратны!
Благодаря этому бин ReplyingKafkaTemplate
в классе NotificationDispatchService
может по ID
корреляции найти соответствующий объект RequestReplyFuture
и получить ожидаемый ответ.
4. Заключение
В этой статье мы рассмотрели использование Apache Kafka для реализации синхронной коммуникации между двумя компонентами в приложении Spring Boot.
Мы пошагово прошли через необходимые этапы конфигурации и смоделировали систему отправки уведомлений.
С помощью ReplyingKafkaTemplate
можно преобразовать асинхронную природу Apache Kafka в паттерн синхронного взаимодействия по принципу запрос-ответ. Этот подход является нетипичным, поэтому перед его применением в production среде важно тщательно оценить его соответствие архитектуре проекта.
Как всегда, все примеры кода, использованные в статье, доступны на GitHub.

Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано
Комментарии (11)
gsaw
07.08.2025 11:37В пошаговых инструкциях как то не нашел, как реализован принцип то, на чем это основано. Это фича Кафки, или Спринга? Нельзя ли добавить пункт с теорией, что бы не разбирать весь текст?
Djaler
07.08.2025 11:37Вот мне тоже кажется, что при наличии нескольких экземпляров приложения это так просто не заработает, потому что обработка разных партиций топика с ответом вполне себе окажется на разных инстансах. Не обязательно на тех, откуда отправляли запрос.
В итоге пример классный, но как будто бесполезный в реальной жизни.
Valerdos_UA
07.08.2025 11:37Инстансы разные, но код обработки ответа-то один и тот же. Какая разница, какой из инстансов его поймал?
Djaler
07.08.2025 11:37При асинхронной обработке - разницы нет. Но тут то смысл в том, что конкретный инстанс начинает "синхронно" ожидать ответ после запроса:
NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) { String requestTopic = synchronousKafkaProperties.requestTopic(); ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest); var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord); return requestReplyFuture.get().value(); }
Sannis
А вы можете для не самых умных объяснить зачем? Зачем нужно использовать целую Кафку для синхронных взаимодействий если можно напрямую послать запрос в "микросервис-publisher"?
Это просто оверинжиниринг 80-го уровня получается.
San_tit
Например, если ту же самую логику кто-то использует асинхронно (не писать же спец api для нового потребителя?), и/или Кафка используется как журнал запросов в системе