На одном из проектов Группы «Иннотех» появилась задача перевода асинхронных запросов в синхронные. По сути, нужно было подружить REST и Apache Kafka в одном запросе.

Если разложить задачу по полочкам, то у нас есть два сервиса, которые общаются между собой — назовём их А и Б. В сервис А приходит потребитель с запросом на получение данных, которые лежат в сервисе Б. Таким образом, сервис А отправляет по REST запрос данных в Б и ожидает ответ на запрос в Kafka. Пока этот ответ не получен, пользователь ждёт данные.

Казалось бы, популярная задача и ответ должен быть в Google или Stack Overflow. Но удалось найти только решение подобной задачи через готовую библиотеку, соединяющую два сервера с Kafka. Поэтому проблема была решена самостоятельно с помощью Java, фреймворка Spring и небольшой ИТ-смекалки.

Постановка задачи

Перед тем как начать реализацию, необходимо поставить задачу и понять, что у нас есть и чего хотим добиться.

У нас есть сервис Client, у которого будет один end-point. А end-point, в свою очередь, будет принимать обычную строку, отправлять её второму сервису и ждать от него ответа в Kafka.

И второй сервис — Server. Он будет принимать строчку по REST от сервиса Client, переводить её в UpperCase и возвращать в ответ по Kafka.

Сервис Client ожидает ответ от Server. Пока он не получит ответ, пользователь будет ждать результат. При этом может быть такая ситуация, что ожидание ответа затягивается. Чтобы не допустить слишком большой паузы, необходимо предусмотреть прерывание по таймауту, например, в 10 секунд. Но это условие не обязательно, мы должны иметь возможность ожидать ответа бесконечно долго. Но точно не 7,5 миллионов лет ровно, иначе ответ может разочаровать.

Таким образом, можно привести пример работы. Потребитель отправляет в сервис Client строку, например, «abc123», и пока не получит ответ от сервиса Server будет висеть в ожидании. Ответ от Server должен быть «ABC123», который будет возвращён потребителю. Если время ожидание ответа превысит таймаут, то вместо ответа будет возвращён HTTP-код с ошибкой 504 (Gateway Timeout).

Реализация Client

Буду описывать только основные моменты. Если требуется код целиком, то ниже прикреплена ссылка на репозиторий.

Реализация SenderReceiver

Ядро логики — это реализация класса SenderReceiver, который отвечает за «сон» процесса, пока тот не получит данные извне.

Этот класс состоит из двух основных методов: receive() — отвечает за засыпание, пока данные не будут получены, и send() — получает данные и будит receive(). Чтобы понимать, в каком состоянии поток сейчас находится, 1) спит и ждёт (метод receive()), или же 2) получает и пробуждает (send()), мы заведём флаг boolean, который назовём transfer и инициализируем значением true.

private boolean transfer = true;

Теперь реализуем receive():

public synchronized void receive() {
    while (transfer) {
        if (timeout != 0 && start.before(new Date(System.currentTimeMillis() - timeout))) {
            timeoutException = new TimeoutException();
            Thread.currentThread().interrupt();
            return;
        }
        try {
            wait(timeout);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    Thread.currentThread().interrupt();
}

Поскольку изначально флаг transfer = true, то процесс переходит в режим ожидания, пока его не разбудит send(), либо же не пройдёт количество миллисекунд, заданное в timeout.

Если значение timeout отличное от 0, то значит задан механизм прерывания потока по времени выполнения. Для этого мы должны проверять текущее время, смещённое на это значение, со временем старта потока. Если время превышено, то мы сохраняем TimeoutException() в переменную, которая изначально равна null, это нам пригодится чуть попозже, и завершаем поток. В случае если флаг transfer стал false и поток не завершил ожидание по таймауту, мы просто выходим из цикла и завершаем поток.

Можно заметить, что timeout со значением 0 уникален. Если он равен 0, то поток будет ожидать пробуждения бесконечно долго. После того как его пробудят и transfer перейдёт в значение false, поток завершится.

Далее реализуем метод send():

public synchronized void send(final T data) {
    transfer = false;
    this.data = data;
    notifyAll();
}

Видно, что метод send() очень прост. Он принимает данные извне, переводит наш флаг transfer в false, сохраняет данные и пробуждает все потоки, которые висят в wait().

На самом деле есть ещё один важный вспомогательный метод getData(), который либо возвращает данные, которые хранятся в data, либо выкидывает ошибку, если соответствующая переменная не null.

public T getData() throws TimeoutException {
    if (Objects.nonNull(timeoutException))
        throw timeoutException;
    return data;
}

Реализация SenderReceiverMap

После того как мы реализовали механизм ожидания ответа и пробуждения при его получении, необходимо реализовать класс, который будет хранить в себе набор таких ожиданий. Это нужно для того, чтобы связать запросы из сервиса Client с ответами из сервиса Server. Так одновременно множество пользователей могут «дёрнуть» наш end-point, а нам необходимо не запутаться и отдать те данные, которые запросил конкретный потребитель. Назовём этот класс SenderReceiverMap.

Очевидно, что ответы от Server могут прийти к нам в произвольном порядке — это зависит от времени обработки конкретного запроса. Например, время обработки одного запроса может быть 5 секунда, а второго — 3. Чтобы мы смогли искать связи потока и запроса от пользователя, нам необходимо как-то их уникально помечать. Для этого мы введём id запроса. Чтобы мы смогли быстро искать, нам необходимо использовать Map. Так как мы работаем с потоками, то необходимо пользоваться безопасными коллекциями. Итого получится:

private final ConcurrentMap<T, SenderReceiver> senderReceiverConcurrentMap;

Как несложно догадаться T — это тип id. Он может быть любым, например, Integer, String, UUID. Я предпочитаю UUID.

Для того чтобы добавить новое ожидание, необходимо реализовать метод — он будет принимать заранее сгенерированный id запроса, который мы дополнительно передаём в Server (но об этом чуть позже) и timeout.

public Thread add(T id, Long timeout) {
    SenderReceiver<V> responseWait = new SenderReceiver<V>(timeout);
    senderReceiverConcurrentMap.put(id, responseWait);
    Runnable task = responseWait::receive;
    return new Thread(task);
}

V — тип данных сообщения. В нашем случае String. Видно, что мы создаём SenderReceiver и добавляем его в коллекцию с соответствующимid запроса. Далее создаём новый Thread() и тут же возвращаем, чтобы мы смогли приостановить его, пока не получим данные в метод  send() в классе SenderReceiver.

Также потребуются методы, которые смогут вернуть нужный нам SenderReceiver из senderReceiverConcurrentMap, проверить есть ли id в senderReceiverConcurrentMap, и метод удаления запроса из senderReceiverConcurrentMap. Просто приведу реализацию, так как комментировать тут нечего.

public SenderReceiver<V> get(T id) {
    return senderReceiverConcurrentMap.get(id);
}
public Boolean containsKey(T id) {
return senderReceiverConcurrentMap.containsKey(id);
}
public SenderReceiver remove(T id) {
return senderReceiverConcurrentMap.remove(id);
}

Реализация запроса данных у сервиса Server

Теперь необходимо реализовать запрос к сервису Server по REST, а также добавить в коллекцию его идентификатор вместе с ожиданием результата.

public String get(String text) throws TimeoutException {
    UUID requestId = UUID.randomUUID();
    while (senderReceiverMap.containsKey(requestId)) {
        requestId = UUID.randomUUID();
    }

    String responseFromServer = this.sendText(requestId, text);
    System.out.println("REST response from server: " + responseFromServer);

    Thread thread = senderReceiverMap.add(requestId, timeout);
    thread.start();
    try {
        thread.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    String responseKafka;
    try {
        responseKafka = senderReceiverMap.get(requestId).getData();
    } catch (TimeoutException e) {
        throw e;
    } finally {
        senderReceiverMap.remove(requestId);
    }
    return responseKafka;
}

Как видно, в этом методе мы принимаем строку от пользователя. Далее, создаём id запроса и отправляем его вместе с полученной строкой. Добавляем запрос вместе с его id в коллекцию запросов и запускаем полученный Thread. На этом шаге процесс зависнет, пока кто-то не «дёрнет» метод send() у объекта SenderReceiver, который можно найти по конкретному id запроса.

После того как кто-то вызовет метод send() и передаст в него данные, либо сам поток завершит существование по timeout, метод продолжит работу. Он вызовет senderReceiverMap.get(requestId).getData(), который и вернёт либо TimeoutException, либо данные, пришедшие в метод send(). Осталось только удалить из коллекции уже обработанный запрос и вернуть данные, которые мы получили.

Реализация KafkaListener

Из вышеописанного понятно, что нам осталось только вызвать метод send() у конкретного объекта ожидания SenderReceiver, который можем запросто найти по его id. Так как мы синхронизируем запросы с Kafka, значит и вызывать метод должны, когда к нам приходят данные из него.

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void listenGroupFoo(ConsumerRecord<String, KafkaMessage<String>> record) {
    UUID rqId = this.getRqId(record.headers());
    if (senderReceiverMap.containsKey(rqId)) {
        SenderReceiver<String> stringSenderReceiver = senderReceiverMap.get(rqId);
        stringSenderReceiver.send(record.value().getData());
    }
}

Тут тоже ничего сложного: из headers получаем id запроса, по нему мы достаём из Map нужный SenderReceiver и просто на нём вызываем метод send().

Таким образом, понятно, что необязательно использовать Kafka для синхронизации, мы можем использовать любой другой поток, главное, чтобы были id запроса и данные, которые хотим вернуть. Например, можно использовать другой брокер сообщений, либо вообще другой запрос REST. Решение гибкое, главное — вызвать send().

Реализация сервиса Server

Здесь всё предельно просто. Важно не забыть поменять port, на котором будет запускаться сервис Server, чтобы обеспечить его одновременный запуск с сервисом Client. Для этого в application.properties устанавливаем следующий параметр:

server.port=8888

Реализация end-point

В сервисе Server должен быть end-point, который принимает данные от сервиса Client и отправляет результат в Kafka, связанный с id запроса.

@PostMapping("/test")
public String test(@RequestBody RequestDto request) {

    Runnable runnable =
            () -> {
                System.out.println("Start requestId: " + request.getRequestId() + "   text: " + request.getData());

                try {
                    int sleepMs = ThreadLocalRandom.current().nextInt(0, 10000 + 1);
                    System.out.println("RequestId: " + request.getRequestId() + " sleep: " + sleepMs + "ms");
                    Thread.sleep(sleepMs);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                kafkaMessageSender.send(request.getRequestId(), new KafkaMessage<>(request.getData().toUpperCase()));

                System.out.println("End requestId: " + request.getRequestId());
            };
    Thread thread = new Thread(runnable);
    thread.start();

    return "Ok!";
}

Этот end-point принимает структуру, в которой содержатся requestId — id запроса и data — данные, которые необходимо обработать. В нашем случае это строка. Этот end-point создаёт отдельный поток, завершение которого он не дожидается, а сразу же возвращает по REST в ответ строчку "Ok!". В потоке мы эмулируем тяжёлую работу Server — для этого мы рандомно генерируем количество миллисекунд, на которые поток заснёт, а после того как проснётся, отправит данные (строку в верхнем регистре) в Kafka.

Реализация отправки сообщения в Kafka

Отправка в Kafka выглядит следующим образом:

public void send(final UUID requestId, final KafkaMessage<String> message) {
    ProducerRecord<String, KafkaMessage<String>> record = new ProducerRecord<>(topic, message);
    record.headers().add(new RecordHeader(RQ_ID, requestId.toString().getBytes()));
    ListenableFuture<SendResult<String, KafkaMessage<String>>> future = kafkaTemplate.send(record);
    future.addCallback((success) -> { }, System.out::println );
    kafkaTemplate.flush();
}

Мы устанавливаем в headers новый header RQ_ID, куда записываем id запроса, а затем просто вызываем send(), в который отправляем обработанные данные.

На этом работа сервиса Server заканчивается.

Выводы

На самом деле в данном решении не обязательно использовать именно REST+Kafka. Как можно заметить, решение универсальное и данную реализацию легко изменить на любые взаимодействия — хоть REST+REST, хоть Kafka+Kafka, хоть на голубиную почту.

Рабочий пример с кодом можно найти тут.

Комментарии (9)


  1. ProstakovAlexey
    14.10.2022 18:09
    +4

    Помню тоже решал такую задачу, тогда не смог убедить что это ерунда и так делать не надо. Решил в итоге, но через время все это удалили, с формулировкой - бред. И перешли на асинхронное рест взаимодействие с тикитами.


    1. Geckelberryfinn
      14.10.2022 20:16

      Угу, тоже было ощущение, что немного костыльное решение. А что за взаимодействие с тикетами, не подбросите ссылку для вдумчивого самостоятельного чтения, пожалуйста?


      1. teror4uks
        14.10.2022 23:07

        Ну наверное отправляется запрос на который создаётся тикет к которой привязывается какая нить бекграунд задача, уникальный номер тикета возвращается клиенту моментально, и клиент в свою очередь ждёт пока бекграунд таска выполняться и сможет вернуть результат. Клиент по уникальному ид тикета на запрос /poll эндпоинта как раз это и проверяет, готов результат или нет.


      1. ProstakovAlexey
        15.10.2022 09:24

        В комментарии ниже кратко описано, если для вдумчивого чтения то - https://smev.gosuslugi.ru/portal/api/files/get/6434 начиная со стр. 67


  1. motoroller95
    14.10.2022 18:22
    +2

    end-point, в свою очередь, будет принимать обычную строку, отправлять её второму сервису и ждать от него ответа в Kafka.

    омг зачем? звучит как "создаем проблему на ровном месте и героически ее превозмогаем".


    1. graf4444 Автор
      14.10.2022 19:09
      -4

      Данная статья это просто демонстрация решения. В целом задачи бывают разные, и иногда данное решение необходимо. В нашем случае было сделано именно так, потому что по REST отдавали "легкие данные" для фильтрации, а получали "тяжелые" данные в кафку, поэтому и было использовано такое решение.
      Ничего подобного в интренете не нашел. REST и KAFKA можно заменить на что угодно, самая главная идея статьи это показать, как мы можем перейти из плоскости "ассинхронное взаимодействие" в "синхронное".


      1. teror4uks
        14.10.2022 23:11
        +2

        Ничего не нашли наверное потому, что это какое то извращение, уж простите за откровение. Незнаю что значит "тяжёлые" данные, но могу предположить что это их размер и если так то вы наверное вкурсе что в кафку сообщения больше одного метра складывать нельзя, так как она начинает жутко тормозить.


      1. igolikov
        17.10.2022 10:28

        В спринге есть ReplyingKafkaTemplate


  1. ggo
    15.10.2022 12:42

    Такое бывает если есть легаси система, у которой только асинхронный интерфейс, а потребителю нужен синхронный, тогда и появляется такой адаптер - sync 2 async