Архитектуры, управляемые событиями (Event Driven Architecture), в целом, и Apache Kafka, в частности, привлекли в последнее время большое внимание. Для реализации всех преимуществ архитектуры, управляемой событиями, механизм делегирования событий должен быть по своей сути асинхронным. Тем не менее, могут существовать некоторые особые сценарии/потоки использования, в которых требуется семантика Синхронного Запроса-Ответа. В этом выпуске показано, как реализовать "Запрос-Ответ" с помощью Apache Kafka.

Перевел @middle_java

Дата оригинальной статьи: 26 October 2018

Apache Kafka по своей природе асинхронна. Следовательно, семантика «Запрос-Ответ» для Apache Kafka не является естественной. Тем не менее эта задача не нова. Паттерн Интеграции Корпоративных приложений (Enterprise Integration Pattern) Request-Reply обеспечивает проверенный механизм синхронного обмена сообщениями по асинхронным каналам:



Паттерн Return Address (Адрес Возврата) дополняет паттерн Request-Reply механизмом указания запрашивающей стороной адреса, на который должен быть отправлен ответ:



Недавно в Spring Kafka 2.1.3 была добавлена поддержка из коробки паттерна «Request Reply», а в версии 2.2 были отполированы некоторые из его шероховатостей. Рассмотрим, как работает эта поддержка:

На стороне клиента: ReplyingKafkaTemplate


Хорошо известная абстракция Template (Шаблон) формирует в Spring основу для клиентской части механизма Request-Reply.

 @Bean
 public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate(
     ProducerFactory < String, Request > pf,
     KafkaMessageListenerContainer < String, Reply > lc) {
     return new ReplyingKafkaTemplate < > (pf, lc);
 }

Здесь все довольно прямолинейно: мы настраиваем ReplyingKafkaTemplate, который отправляет сообщения-запросы со String ключами и получает сообщения-ответы со String ключами. Вместе с тем ReplyingKafkaTemplate должен быть основан на ProducerFactory Запроса, ConsumerFactory Ответа и MessageListenerContainer с соответствующими конфигурациями консюмера и продюсера. Следовательно, необходимая конфигурация довольно развесиста:

 @Value("${kafka.topic.car.reply}")
 private String replyTopic;

 @Bean
 public Map < String, Object > consumerConfigs() {
     Map < String, Object > props = new HashMap < > ();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

     return props;
 }

 @Bean
 public Map < String, Object > producerConfigs() {
     Map < String, Object > props = new HashMap < > ();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
     return props;
 }

 @Bean
 public ProducerFactory < String, Request > requestProducerFactory() {
     return new DefaultKafkaProducerFactory < > (producerConfigs());
 }

 @Bean
 public ConsumerFactory < String, Reply > replyConsumerFactory() {
     return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),
         new JsonSerializer < Reply > ());
 }

 @Bean
 public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() {
     ContainerProperties containerProperties = new ContainerProperties(replyTopic);
     return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties);
 }

В этом случае использование replyKafkaTemplate для отправки синхронного запроса и получения ответа выглядит следующим образом:

@Value("${kafka.topic.car.request}")
private String requestTopic;

@Value("${kafka.topic.car.reply}")
private String replyTopic;

@Autowired
private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate;

...
RequestReply request = RequestReply.request(...);
//создаем producer record
ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request);
// устанавливаем топик для ответа в заголовке
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// отправляем запрос в топик Kafka и асинхронно получаем ответ в указанный топик для ответа
RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () {
    @Override
    public void onSuccess(ConsumerRecord < String, Reply > result) {
        // получаем значение consumer record
        Reply reply = result.value();
        System.out.println("Reply: " + reply.toString());
    }
});

Здесь также много бойлерплейта и низкоуровневого API, да еще этот устаревший API ListenableFuture вместо современной CompletableFuture.

requestReplyKafkaTemplate заботится о генерации и установке заголовка KafkaHeaders.CORRELATION_ID, но мы должны явно задать заголовок KafkaHeaders.REPLY_TOPIC для запроса. Обратите также внимание, что этот же топик для ответа был излишне заинжектен выше в replyListenerContainer. Гадость какая-то. Не совсем то, чего я ожидал от абстракции Spring.

Серверная сторона: @SendTo


На стороне сервера обычный KafkaListener, прослушивающий топик для запроса, дополнительно декорирован аннотацией @SendTo, чтобы предоставить сообщение-ответ. Объект, возвращаемый методом слушателя, автоматически оборачивается (wrapped) в ответное сообщение, добавляется CORRELATION_ID и ответ публикуется в топике, указанном в заголовке REPLY_TOPIC.

 @Bean
 public Map < String, Object > consumerConfigs() {
     Map < String, Object > props = new HashMap < > ();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

     return props;
 }

 @Bean
 public Map < String, Object > producerConfigs() {
     Map < String, Object > props = new HashMap < > ();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

     return props;
 }

 @Bean
 public ConsumerFactory < String, Request > requestConsumerFactory() {
     return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),
         new JsonSerializer < Request > ());
 }

 @Bean
 public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() {
     ConcurrentKafkaListenerContainerFactory < String, Request > factory =
         new ConcurrentKafkaListenerContainerFactory < > ();
     factory.setConsumerFactory(requestConsumerFactory());
     factory.setReplyTemplate(replyTemplate());
     return factory;
 }

 @Bean
 public ProducerFactory < String, Reply > replyProducerFactory() {
     return new DefaultKafkaProducerFactory < > (producerConfigs());
 }

 @Bean
 public KafkaTemplate < String, Reply > replyTemplate() {
     return new KafkaTemplate < > (replyProducerFactory());
 }

Здесь также требуется некоторая конфигурация, но конфигурация слушателя проще:

 @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory")
 @SendTo()
 public Reply receive(Request request) {
     Reply reply = ...;
     return reply;
 }

Но как насчет нескольких экземпляров консюмера?


Все вроде работает, пока мы не используем несколько экземпляров консюмера. Если у нас есть несколько экземпляров клиента, мы должны убедиться, что ответ отправляется в корректный экземпляр клиента. Документация Spring Kafka предполагает, что каждый консюмер может использовать уникальный топик или, что с запросом отправляется дополнительное значение заголовка KafkaHeaders.RESPONSE_PARTITION — четырехбайтовое поле, содержащее BIG-ENDIAN-представление целочисленного номера раздела. Использование раздельных топиков для разных клиентов явно не очень гибко, поэтому мы выбираем явную настройку REPLY_PARTITION. Тогда клиент должен знать, на какую партицию он назначен. В документации предлагается использовать явную конфигурацию для выбора конкретной партиции. Давайте добавим ее к нашему примеру:

 @Value("${kafka.topic.car.reply.partition}")
 private int replyPartition;

 ...

 @Bean
 public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() {
     ContainerProperties containerProperties = new ContainerProperties(replyTopic);
     TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition);
     return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset);
 }
 private static byte[] intToBytesBigEndian(final int data) {
         return new byte[] {
             (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff),
             (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff),
         };
     }

     ...
     record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
 record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition)));
 RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
 ...

Не очень красиво, но это работает. Требуемая конфигурация обширна и API выглядит низкоуровнево. Необходимость явной настройки партиций усложняет процесс динамического масштабирования количества клиентов. Очевидно, можно сделать лучше.

Инкапсулирование обработки топика для ответа и партиции


Давайте начнем с инкапсуляции паттерна Return Address, передавая вместе топик для ответа и партицию. Топик для ответа должен быть заинжектен в RequestReplyTemplate и, следовательно, вообще не должен присутствовать в API. Когда речь идет о партициях для ответа, сделаем наоборот: извлечем партицию (партиции), назначенную слушателю топика для ответа, и передадим эту партицию автоматически. Это избавляет клиента от необходимости заботиться об этих заголовках.
При этом, давайте также сделаем API таким, чтобы он напоминал стандартный KafkaTemplate (перегрузим метод sendAndReceive() упрощенными параметрами и добавим соответствующие перегруженные методы, использующие настроенный по умолчанию топик):

public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > {

    public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory,
        GenericMessageListenerContainer < K, R > replyContainer) {
        super(producerFactory, replyContainer);
    }

    private TopicPartition getFirstAssignedReplyTopicPartition() {
        if (getAssignedReplyTopicPartitions() != null &&
            getAssignedReplyTopicPartitions().iterator().hasNext()) {
            TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Using partition " + replyPartition.partition());
            }
            return replyPartition;
        } else {
            throw new KafkaException("Illegal state: No reply partition is assigned to this instance");
        }
    }

    private static byte[] intToBytesBigEndian(final int data) {
        return new byte[] {
            (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff),
            (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff),
        };
    }

    public RequestReplyFuture < K,
    V,
    R > sendAndReceiveDefault(@Nullable V data) {
        return sendAndReceive(getDefaultTopic(), data);
    }

    public RequestReplyFuture < K,
    V,
    R > sendAndReceiveDefault(K key, @Nullable V data) {
        return sendAndReceive(getDefaultTopic(), key, data);
    }

    ...

    public RequestReplyFuture < K,
    V,
    R > sendAndReceive(String topic, @Nullable V data) {
        ProducerRecord < K, V > record = new ProducerRecord < > (topic, data);
        return doSendAndReceive(record);
    }

    public RequestReplyFuture < K,
    V,
    R > sendAndReceive(String topic, K key, @Nullable V data) {
        ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data);
        return doSendAndReceive(record);
    }

    ...

    @Override
    public RequestReplyFuture < K,
    V,
    R > sendAndReceive(ProducerRecord < K, V > record) {
        return doSendAndReceive(record);
    }

    protected RequestReplyFuture < K,
    V,
    R > doSendAndReceive(ProducerRecord < K, V > record) {
        TopicPartition replyPartition = getFirstAssignedReplyTopicPartition();
        record.headers()
            .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes()))
            .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION,
                intToBytesBigEndian(replyPartition.partition())));
        return super.sendAndReceive(record);
    }
}

Следующий шаг: Адаптируем ListenableFuture к более современной CompletableFuture.

public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > {

    public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory,
        GenericMessageListenerContainer < K, R > replyContainer) {
        super(producerFactory, replyContainer);
    }

    public CompletableFuture < R > requestReplyDefault(V value) {
        return adapt(sendAndReceiveDefault(value));
    }

    public CompletableFuture < R > requestReplyDefault(K key, V value) {
        return adapt(sendAndReceiveDefault(key, value));
    }

    ...

    public CompletableFuture < R > requestReply(String topic, V value) {
        return adapt(sendAndReceive(topic, value));
    }

    public CompletableFuture < R > requestReply(String topic, K key, V value) {
        return adapt(sendAndReceive(topic, key, value));
    }

    ...

    private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) {
        CompletableFuture < R > completableResult = new CompletableFuture < R > () {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                boolean result = requestReplyFuture.cancel(mayInterruptIfRunning);
                super.cancel(mayInterruptIfRunning);
                return result;
            }
        };
        // добавим коллбек к результату отправки запроса
        requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () {
            @Override
            public void onSuccess(SendResult < K, V > sendResult) {
                // NOOP
            }
            @Override
            public void onFailure(Throwable t) {
                completableResult.completeExceptionally(t);
            }
        });
        // добавим коллбек к ответу
        requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () {
            @Override
            public void onSuccess(ConsumerRecord < K, R > result) {
                completableResult.complete(result.value());
            }
            @Override
            public void onFailure(Throwable t) {
                completableResult.completeExceptionally(t);
            }
        });
        return completableResult;
    }
}

Упакуем это в утилитную библиотеку и теперь у нас есть API, который намного больше соответствует основной философии проектирования Spring «Соглашение над Конфигурацией» («Convention over Configuration»). Вот итоговый код клиента:

 @Autowired
 private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate;

 ...

 requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - >
     System.out.println("Reply: " + reply.toString());
 );

Подводим итоги


Подводя итог, Spring для Kafka 2.2 обеспечивает полностью функциональную реализацию паттерна Request-Reply в Apache Kafka, но API по-прежнему имеет некоторые шероховатости. В этом выпуске мы увидели, что некоторые дополнительные абстракции и адаптации API могут дать более логичный высокоуровневый API.

Предупреждение 1:
Одним из главных преимуществ архитектуры, управляемой событиями, является разделение (decoupling) продюсеров и консюмеров событий, что позволяет создавать гораздо более гибкие и эволюционирующие системы. Использование синхронной семантики «Запрос-Ответ» является полной противоположностью, когда запрашивающая и отвечающая стороны сильно связаны между собой. Следовательно, ее следует использовать только в случае необходимости.

Предупреждение 2:
Если требуется синхронный Запрос-Ответ, то протокол на основе HTTP намного проще и эффективнее, чем использование асинхронного канала типа Apache Kafka.
Тем не менее, могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл. Разумно выбирайте лучший инструмент для работы.

Полностью рабочий пример можно найти на сайте github.com/callistaenterprise/blog-synchronous-kafka.

Комментарии


Federico • 7 месяцев назад
А когда у нас есть гибридные потребности, допустим, в 50% кейсов нам нужен Запрос-Ответ и в 50% нам нужно событийное управление? Как нам это сделать? Конфигурация, необходимая Spring Kafka, выглядит довольно ужасно.

Jehanzeb Qayyum • 6 месяцев назад
Теперь Spring имеет дефолтную поддержку с использованием партиций в одном общем топике для ответа.

Начиная с версии 2.2, шаблон пытается определить топик для ответа или партицию из сконфигурированного контейнера ответа (reply container).

https://docs.spring.io/spring-kafka/reference/html/#replying-template

nir rozenberg • 8 месяцев назад
Привет,
В последнем абзаце вы написали, что могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл по сравнению с HTTP.
Можно привести примеры таких сценариев?
Спасибо,
Nir

Janne Keskitalo nir rozenberg • 8 месяцев назад
Мы собираемся разделить систему обработки транзакций большого объема на несколько микросервисов и у меня есть идея использовать обмен сообщениями Kafka «Запрос-Ответ» для достижения похожих способов обработки (processing affinity). В основном Kafka используется для маршрутизации всех вызовов одного клиента в один и тот же процесс обработчика транзакций, который затем последовательно их выполняет по одному. Такой вид обработки гарантирует линеаризируемость (https://stackoverflow.com/a/19515375/7430325), причинно-следственную связность, а также позволяет эффективное кэширование. По сути, усилия по координации были бы перенесены из базы данных в Kafka и мы могли бы запустить базу данных в строгом режиме изоляции Serializable.
Мне еще предстоит углубиться в детали нашей семантики транзакций, чтобы увидеть, где здесь не дотягивает, так что это пока просто идея.

Перевел @middle_java

Комментарии (1)


  1. ultar
    18.11.2019 09:28

    Не очень понятно какая из сторон отвечает за то, чтобы в топике было нужное количество партиций для потребителей? Что делать ответчику, если нужно отправить сообщение в partition_id, но партиции с таким номером нет.