Apache Kafka — король асинхронного взаимодействия в микросервисных архитектурах. Но что если нужно получить ответ сразу? Перевод от команды Spring АйО шаг за шагом покажет, как превратить Kafka в инструмент синхронной коммуникации — с настройкой ReplyingKafkaTemplate, топиками для ответа и тайм-аутами.


1. Обзор

Apache Kafka зарекомендовал себя как одна из самых популярных и широко используемых систем обмена сообщениями для event-driven архитектур. В таких архитектурах один микросервис публикует сообщение в топик, а другой микросервис асинхронно потребляет и обрабатывает его.

Тем не менее, в некоторых сценариях требуется немедленный ответ от микросервиса-publisher`а для продолжения дальнейшей обработки. Хотя Kafka изначально предназначена для асинхронного взаимодействия, её можно настроить для поддержки синхронной коммуникации по принципу запрос-ответ с использованием отдельных топиков.

В этой статье мы рассмотрим, как реализовать синхронную коммуникацию типа запрос-ответ в приложении Spring Boot с использованием Apache Kafka.

2. Настройка проекта

Для демонстрации мы смоделируем систему отправки уведомлений. Мы создадим одно приложение на Spring Boot, которое будет одновременно выступать в роли продюсера и консюмера.

2.1. Зависимости

Начнём с добавления зависимости Spring Kafka в файл pom.xml нашего проекта:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.3.4</version>
</dependency>

Эта зависимость предоставляет необходимые классы для установления соединения и взаимодействия с настроенным экземпляром Kafka.

2.2. Определение сообщений запроса и ответа

Далее определим два record-класса, которые будут представлять сообщения запроса и ответа:

record NotificationDispatchRequest(String emailId, String content) {
}

public record NotificationDispatchResponse(UUID notificationId) {
}

Здесь record NotificationDispatchRequest содержит emailId и содержимое уведомления, а record NotificationDispatchResponse включает уникальный notificationId, который генерируется после обработки запроса.

2.3. Определение топиков Kafka и конфигурационных свойств

Теперь определим топики Kafka для запроса и ответа. Кроме того, настроим длительность тайм-аута для получения ответа от компонента-консюмера.

Эти свойства мы сохраним в файле application.yaml нашего проекта и воспользуемся аннотацией @ConfigurationProperties для сопоставления значений с Java record`ом, к которому смогут обращаться конфигурационный и сервисный уровни приложения:

@Validated
@ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous")
record SynchronousKafkaProperties(
    @NotBlank
    String requestTopic,

    @NotBlank
    String replyTopic,

    @NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)
    Duration replyTimeout
) {
}

Мы также добавили аннотации валидации, чтобы гарантировать корректную настройку всех необходимых свойств. Если какая-либо из проверок не будет пройдена, контекст приложения Spring ApplicationContext не сможет подняться. Это позволяет нам придерживаться принципа fail-fast.

Ниже приведён фрагмент файла application.yaml, в котором определены необходимые свойства. Эти свойства будут автоматически сопоставлены с record`ом SynchronousKafkaProperties:

com:
  baeldung:
    kafka:
      synchronous:
        request-topic: notification-dispatch-request
        reply-topic: notification-dispatch-response
        reply-timeout: 30s

Здесь мы настраиваем имена топиков Kafka для запроса и ответа, а также тайм-аут ожидания ответа, равный тридцати секундам.

Помимо наших пользовательских свойств, добавим также несколько основных конфигурационных параметров Kafka в файл application.yaml:

spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: synchronous-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: com.baeldung.kafka.synchronous
    properties:
      allow:
        auto:
          create:
            topics: true

Прежде всего, чтобы наше приложение могло подключиться к настроенному экземпляру Kafka, мы указываем URL bootstrap-сервера, используя переменную окружения.

Далее мы настраиваем свойства сериализации и десериализации ключей и значений как для продюсера, так и для consumer`a. Кроме того, для consumer`a мы указываем group-id и задаём доверенный пакет, содержащий наши record`ы запроса и ответа, для корректной JSON-десериализации.

После настройки вышеуказанных свойств Spring Kafka автоматически создаёт для нас бины типов ConsumerFactory и ProducerFactory. Мы будем использовать их для определения дополнительных конфигурационных бинов Kafka в следующем разделе.

Наконец, мы включаем автоматическое создание топиков, чтобы Kafka могла создавать их при необходимости. Важно отметить, что данное свойство включено исключительно для целей демонстрации — в production приложениях так делать не следует.

2.4. Определение конфигурационных бинов Kafka

После настройки конфигурационных свойств давайте определим необходимые конфигурационные бины Kafka:

@Bean
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(
    ConsumerFactory<String, NotificationDispatchResponse> consumerFactory
) {
    String replyTopic = synchronousKafkaProperties.replyTopic();
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}

Сначала мы внедряем экземпляр ConsumerFactory и используем его вместе с настроенным replyTopic для создания бина KafkaMessageListenerContainer. Этот бин отвечает за создание контейнера, который слушает сообщения из нашего replyTopic-а, то есть топика ответов.

Далее мы определим основной бин, который будет использоваться на уровне сервиса для реализации синхронного взаимодействия:

@Bean
ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(
    ProducerFactory<String, NotificationDispatchRequest> producerFactory,
    KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer
) {
    Duration replyTimeout = synchronousKafkaProperties.replyTimeout();
    var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
    replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);
    return replyingKafkaTemplate;
}

С использованием ProducerFactory и ранее определённого бина KafkaMessageListenerContainer мы создаём бин ReplyingKafkaTemplate. Кроме того, с помощью внедрённого synchronousKafkaProperties настраиваем тайм-аут ожидания ответа, заданный в файле application.yaml. Этот параметр определяет, как долго наш сервис будет ждать ответное сообщение в replyTopic перед срабатыванием тайм-аута.

Бин ReplyingKafkaTemplate управляет взаимодействием между топиками запроса и ответа, обеспечивая возможность синхронной коммуникации через Kafka.

Наконец, определим бины, которые позволят нашему компоненту-consumer`у отправлять ответы обратно в топик ответов:

@Bean
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {
    return new KafkaTemplate<>(producerFactory);
}

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(
    ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,
    KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();
    factory.setConsumerFactory(consumerFactory);
    factory.setReplyTemplate(kafkaTemplate);
    return factory;
}
Комментарий от команды Spring АйО

Напомним, что в данном простом случае, одно и то же приложение как отправляет запрос в Kafka, так и посылает ответное сообщение на этот запрос в replyTopic.

Сначала мы создаём стандартный бин KafkaTemplate, используя бин ProducerFactory.

Затем, совместно с ConsumerFactory, мы используем его для определения бина KafkaListenerContainerFactory. Этот бин позволяет нашим компонентам-consumer`ам, которые потребляют сообщения из топика запросов, отправлять ответное сообщение в топик ответов после завершения необходимой обработки.

3. Реализация синхронной коммуникации с использованием Kafka

После завершения настройки конфигурации давайте реализуем синхронную коммуникацию по принципу запрос-ответ между двумя настроенными топиками Kafka.

3.1. Отправка и получение сообщений с использованием ReplyingKafkaTemplate

Сначала создадим класс NotificationDispatchService, который будет отправлять сообщения в настроенный топик запросов с помощью ранее определённого бина ReplyingKafkaTemplate:

@Service
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
class NotificationDispatchService {

    private final SynchronousKafkaProperties synchronousKafkaProperties;
    private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;

    // standard constructor

    NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {
        String requestTopic = synchronousKafkaProperties.requestTopic();
        ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);

        var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
        return requestReplyFuture.get().value();
    }
}

В методе dispatch() мы используем autowired экземпляр synchronousKafkaProperties, чтобы получить значение requestTopic, настроенное в файле application.yaml. Затем, совместно с переданным в аргументе метода объектом notificationDispatchRequest, мы создаём экземпляр ProducerRecord.

Далее мы передаём созданный ProducerRecord в метод sendAndReceive(), чтобы опубликовать сообщение в топик запросов. Этот метод возвращает объект RequestReplyFuture, который мы используем для ожидания ответа и последующего получения его значения.

Внутри, при вызове метода sendAndReceive(), класс ReplyingKafkaTemplate генерирует уникальный ID корреляции — случайный UUID — и добавляет его в заголовок исходящего Kafka сообщения. Также в заголовок добавляется имя топика для ответа, в который ожидается поступление ответного сообщения. Напомним, что мы уже настроили этот топик в бине KafkaMessageListenerContainer.

Бин ReplyingKafkaTemplate использует сгенерированный ID корреляции в качестве ключа для сохранения объекта RequestReplyFuture в потокобезопасной структуре ConcurrentHashMap. Это обеспечивает корректную работу в многопоточной среде и поддержку параллельных запросов.

3.2. Определение consumer`а сообщений Kafka

Теперь, чтобы завершить реализацию, создадим компонент-consumer, который будет обрабатывать сообщения из настроенного топика запросов и отправлять ответ в топик ответов:

@Component
class NotificationDispatchListener {

    @SendTo
    @KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")
    NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {
        // ... processing logic
        UUID notificationId = UUID.randomUUID();
        return new NotificationDispatchResponse(notificationId);
    }
}

Мы используем аннотацию @KafkaListener для прослушивания топика запросов, указанного в файле application.yaml.

Внутри метода listen() мы просто возвращаем record NotificationDispatchResponse, содержащий уникальный notificationId.

Ключевой момент — аннотация @SendTo, которой мы помечаем метод. Она сообщает Spring Kafka, что из заголовков входящего сообщения необходимо извлечь ID корреляции и имя топика для ответа. Эти данные используются для автоматической отправки возвращаемого значения метода в соответствующий топик ответов с тем же ID корреляции в заголовке сообщения.

Комментарий от команды Spring АйО

На самом деле, аннотация @SendTo общая, и не имеет чёткой привязки к Spring Kafka проекту. Она также используется в проектах Spring Integration и в Spring Jms, например. То, как Spring обрабатывает @SendTo , очень сильно зависит от технологии (Spring Kafka, Spring Jms и т.д.), от настроек среды окружения и т.д. Поэтому будьте с ней аккуратны!

Благодаря этому бин ReplyingKafkaTemplate в классе NotificationDispatchService может по ID корреляции найти соответствующий объект RequestReplyFuture и получить ожидаемый ответ.

4. Заключение

В этой статье мы рассмотрели использование Apache Kafka для реализации синхронной коммуникации между двумя компонентами в приложении Spring Boot.

Мы пошагово прошли через необходимые этапы конфигурации и смоделировали систему отправки уведомлений.

С помощью ReplyingKafkaTemplate можно преобразовать асинхронную природу Apache Kafka в паттерн синхронного взаимодействия по принципу запрос-ответ. Этот подход является нетипичным, поэтому перед его применением в production среде важно тщательно оценить его соответствие архитектуре проекта.

Как всегда, все примеры кода, использованные в статье, доступны на GitHub.


Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано

Комментарии (11)


  1. Sannis
    07.08.2025 11:37

    А вы можете для не самых умных объяснить зачем? Зачем нужно использовать целую Кафку для синхронных взаимодействий если можно напрямую послать запрос в "микросервис-publisher"?

    Это просто оверинжиниринг 80-го уровня получается.


    1. San_tit
      07.08.2025 11:37

      Например, если ту же самую логику кто-то использует асинхронно (не писать же спец api для нового потребителя?), и/или Кафка используется как журнал запросов в системе


  1. gsaw
    07.08.2025 11:37

    В пошаговых инструкциях как то не нашел, как реализован принцип то, на чем это основано. Это фича Кафки, или Спринга? Нельзя ли добавить пункт с теорией, что бы не разбирать весь текст?


    1. Djaler
      07.08.2025 11:37

      Вот мне тоже кажется, что при наличии нескольких экземпляров приложения это так просто не заработает, потому что обработка разных партиций топика с ответом вполне себе окажется на разных инстансах. Не обязательно на тех, откуда отправляли запрос.

      В итоге пример классный, но как будто бесполезный в реальной жизни.


      1. Valerdos_UA
        07.08.2025 11:37

        Инстансы разные, но код обработки ответа-то один и тот же. Какая разница, какой из инстансов его поймал?


        1. Djaler
          07.08.2025 11:37

          При асинхронной обработке - разницы нет. Но тут то смысл в том, что конкретный инстанс начинает "синхронно" ожидать ответ после запроса:

          NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {
                  String requestTopic = synchronousKafkaProperties.requestTopic();
                  ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);
          
                  var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
                  return requestReplyFuture.get().value();
              }