На текущий момент WebClient — актуальный клиент для выполнения HTTP‑запросов. Он предоставляет достаточно гибкий интерфейс, позволяющий декларативно компоновать асинхронную логику.

В этой статье я распишу, что это такое и с чем его едят, как работать с реактивными стримами, гибко обрабатывать ответы и ловить ошибки.

Введение

В первую очередь, чтобы отправить запрос, нужно использовать его ответ в качестве Publisher. То есть, чтобы начать выполнение асинхронной логики, на нее нужно подписаться. Есть 3 основных способа, как это сделать:

  1. Самый простой — заблокировать поток и подождать его результата, используя.block(). Этот способ превращает взаимодействие в обычное синхронное и позволяет основному потоку дождаться ответа и работать с ним, как с обычным объектом.

  2. Еще можно прямо из потока Netty отдать ответ на откуп потребителю, например, возвращать Mono<V> в контроллере.

  3. А используя методы .subscribe() основной поток продолжит свое выполнение, полностью отдав обработку ответа на откуп Netty. От прошлого варианта этот отличает то, что нам не нужно возвращать клиенту результат запроса.

Первый способ самый очевидный и интуитивно понятный — не нужно мучаться с асинхронностью, лямбдами и обертками. Выполнили реквест — обработали ответ. Однако, это, очевидно, обладает всеми недостатками синхронного АПИ.

Получается, необходимо выполнить запрос, получить ответ, обработать его и выполнить всю связанную бизнес-логику не выходя из потока Netty. Но как это сделать, избегая спагетти-кода и минимизируя влияние декларативного описания на привычный нам кодстайл?

Шаг 0. Создание реактивной цепочки

Реактивная цепочка исполняется в следующем порядке:

  1. Сборка цепочки происходит сверху‑вниз

  2. Подписка — снизу‑вверх

  3. Обработка элементов — сверху‑вниз

То есть, вкратце, сначала цепочка собирается: каждый элемент цепочки возвращает свой элемент Mono, который передается дальше, пока не доходит до Подписчика. После этого информация о подписке передается выше, пока не дойдет до Издателя, который отправляет сигнал обратно. Только тогда, когда Подписчик получит информацию об успешной подписке, он начнет запрашивать элементы у издателя, которые будут обработаны согласно задекларированному описанию.

При использовании WebClient выполнение операторов, вызванных после методов exchange/retrieve/exchangeToMono, описанных ниже, будет производиться в одном из потоков Netty.

Шаг 1. Mono и Flux — это что?

Mono и Flux — реактивные стримы, отличающиеся тем, что Mono представляет стрим от 0 до 1 элемента (и этим напоминает Optional), а Flux — стрим от 0 до N элементов.

Так как при запросе на сервер, мы ожидаем получить 0 или 1 ответ, в примерах используется Mono. Его интерфейс является подмножеством операторов Flux, а также несколькими методами, позволяющими приведение к Flux или CompletableFuture.

В документации есть FAQ "Я хочу сделать X с Y, что мне для этого использовать?"

Шаг 2. Разберемся с способами обработки ответов

Интерфейс WebClient предоставляет несколько методов для этого:

  • exchange() — @deprecated метод, выполняющий запрос и возвращающий Mono<ClientResponse>, содержащий статус и заголовки ответа.

  • retrieve() — возвращает объект ResponseSpec, в котором можно объявить простейшие методы обработки ответа, например, <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyClass), чтобы получить весь ответ с заголовками и т. д., или <T> Mono<T> bodyToMono(Class<T> elementClass), если интересует только тело. Также можно указать разную обработку для разных HTTP кодов, используя методы onStatus

  • <V> Mono<V> exchangeToMono(Function<ClientResponse, ? extends Mono<V>> responseHandler) (и его аналог exchangeToFlux) — альтернатива методу retrieve(), дающая возможность более гибкой обработки ответа благодаря доступу к ClientResponse.

По умолчанию при 4хх и 5хх кодах ответа возникает исключение WebClientResponseException, что можно переопределить при необходимости. Ниже код из класса DefaultResponseSpec, который это задает.

private static final StatusHandler DEFAULT_STATUS_HANDLER =
				new StatusHandler(code -> code.value() >= 400, ClientResponse::createException);
Примеры использования этих методов обработки ответов из документации

Пример exchange():

Mono<Person> mono = client.get()
      .uri("/persons/1")
      .accept(MediaType.APPLICATION_JSON)
      .exchange()
      .flatMap(response -> response.bodyToMono(Person.class));

Пример retrieve():

Mono<ResponseEntity<Person>> entityMono = client.get()
      .uri("/persons/1")
      .accept(MediaType.APPLICATION_JSON)
      .retrieve()
      .toEntity(Person.class);

Пример exchangeToMono():

  Mono<Person> entityMono = client.get()
      .uri("/persons/1")
      .accept(MediaType.APPLICATION_JSON)
      .exchangeToMono(response -> {
          if (response.statusCode().equals(HttpStatus.OK)) {
              return response.bodyToMono(Person.class);
          }
          else {
              return response.createError();
          }
      });

Если вы реализуете fire‑and‑forget запросы или простейший адаптер — вам отлично подойдут простые преобразования, предоставляемые retrieve(). Но вот взаимодействия накапливаются, обработка ответов усложняется, а количество схожих реализаций классов, реализующих отправку запросов, становится неприятно большим. В этот момент появляется потребность в новых уровнях абстракции и возможности переиспользования кода. Здесь на помощь приходит exchangeToMono.

ClientResponse представляет HTTP ответ, возвращаемый WebClient. Он предоставляет доступ к коду и заголовкам ответа, а так же методы, чтобы consume тело, с которым можно выполнить необходимые преобразования и другую логику.

Первое, что бросается в глаза, когда мы обращаем внимание на метод exchangeToMono — это то, что обработка ответа происходит внутри лямбда‑функции. А это значит, что, во‑первых, мы можем преобразовать ответ каким нам угодно способом, а во‑вторых, как минимум, вынести обработку в отдельный метод, визуально разгрузив код.

Вот как это можно представить:

public Mono<ResponseEntity<String>> send() {
    // ...
    return webClient
            .get()
            // ...
            .exchangeToMono(this::handleResponse);
}

public Mono<ResponseEntity<String>> handleResponse(ClientResponse clientResponse) {
    return clientResponse
            .toEntity(String.class)
            .map(response -> {
                if (response.getStatusCode().isError()) {
                    return handleErrorResponse(response);
                }

                return handleSuccessResponse(response);
            });
}

Мы вынесли обработку ответа в отдельный метод, который передаем в exchangeToMono. Код стал немного чище. Пойдем дальше — где есть отдельный метод, там можно добавить и отдельный класс, полностью вынеся логику обработки ответа в отдельное место. Красота!

public interface AsyncHandler<Body> {

    Mono<Body> handleResponse(ClientResponse clientResponse);

    Body handleSuccessResponse(ResponseEntity<String> responseEntity); 

    Body handleErrorResponse(ResponseEntity<String> responseEntity);

}

public interface AsyncSender {
    <Body> Mono<Body> send(/* другие параметры*/, AsyncHandler<Body> responseHandler);

}
Подводные камни

Если в бинах-реализациях AsyncHandler есть нефинальные переменные, они будут волатильны, и есть риск перезаписи другими netty-потоками. Присмотритесь к скоупу prototype

Шаг 3. .map(), .flatMap() и цепочка запросов

В этом шаге остановимся на базовых методах преобразования Mono (точнее объекта, им изданного). Так как Mono является реактивным стримом, то и преобразования похожи на достаточно привычные нам стримы.

map() — синхронное преобразование

  • map() выполняет преобразование, применяя к объекту синхронную функцию

  • mapNotNull() аналогичен map(), но позволяет вернуть null значение. В этом случае Mono сразу выполняется

Эти методы принимают на вход функцию преобразования:

return Mono
        // ...
        .map(person -> {
                if (person.getId() != null) {
                    return update(person);
                }
                return create(person);
            });

flatMap() — асинхронное преобразование

  • flatMap() выполняет преобразование асинхронно, возвращая значение, изданное другим Mono. Может изменить тип значения (напр. Mono<Person> → Mono<Address>)

  • flatMapMany() предоставляет возможность преобразования к множеству, представляемому Flux. Есть реализация с разной обработкой значения, ошибки и сигнала завершения выполнения

Также flatMap подойдет, если есть необходимость выполнить несколько реквестов друг за другом:

getElement()
  .flatMap(old -> updateElement(old, new));

При необходимости можно добавить обработку краевых случаев и разное поведение при разных ответах.

А для того, чтобы триггернуть исполнение какой‑либо логики без изменения издаваемого объекта можно использовать методы doOnNext, doOnSuccess, doOnError, doFirst, doFinally и др.

Шаг 4. Обработка ошибок

Любое непроверяемое исключение преобразуется в событие onError. Есть 3 варианта, когда обработка HTTP запроса может закончиться ошибкой:

  1. Вручную заменили успешное Mono на ошибку

  2. Пришел валидный HTTP ответ, но в процессе его обработки было выброшено исключение

  3. Не пришел ответ по любой причине: таймаут, недоступность и т. д., и т. п.

Рассмотрим первый вариант. Для этого в любой момент обработки ответа (методы map(), flatMap(), then()) нужно выкинуть исключение или обернуть его в Mono.error(e). Это выполнит Mono с сигналом ошибки.

Во втором случае, как и обычно, нужно предусмотреть обработку всех исключений, которые мы не хотим прокинуть дальше. Исключения, не попадающие под п.3, возникают в следующих ситуациях:

  • По умолчанию было выброшено исключение на 4хх и 5хх коды ответа

  • Разработчик явно выбросил непроверяемое исключение, используя throw

  • В процессе выполнения логики с объектом внутри Mono возникло необработанное исключение, которое пролетело выше

В третьем случае ошибка чаще всего возникает по независящим от разработчика обстоятельствам.

Что с этим делать дальше?

В реактивных стримах ошибки завершают выполнение последовательности операций и отправляются на обработку Подписчику и его методу onError. Эти ошибки всегда надо обрабатывать на уровне приложения используя метод onError и/или другие операции, позволяющие обработать ошибки в процессе выполнения цепочки операций.

Прежде чем вы познакомитесь с операторами обработки ошибок, вы должны помнить, что любая ошибка в реактивной последовательности является конечным событием. Даже если используется оператор обработки ошибок, он не позволяет продолжить исходную последовательность. Скорее, он преобразует сигнал onError в начало новой (fallback) последовательности. Другими словами, он заменяет ей уже завершенную последовательность.

Project Reactor документация

Случай 1. Исключение внутри оператора

Потенциально любой код внутри операторов может вызвать исключение, как и в обычных Java-методах. Если такой риск есть, стоит воспользоваться привычным try/catch:)

Случай 2. Не изменять ошибку, просто выполнить какие-то действия

doOnError как и другие операторы doOn позволяет как-то среагировать на события в последовательности их не изменяя. Подойдет, чтобы написать логи, отправить уведомление и т.д., но больше ничего не делать

someMono
  .doOnError(e -> log("пупупу"))

Случай 3. Поймать и обработать ошибку

onErrorResume — фактически реактивный аналог try/catch. Позволяет поймать ошибку и динамически посчитать возвращаемое в этом случае значение Mono<V>, а так же выполнить другие необходимые действия. Возвращает альтернативного Издателя.

someMono
  .onErrorResume(e -> {
    log("пупупу");
    return Mono.just(defaultPerson);
  })

Случай 4. Поймать и преобразовать ошибку

Ситуацию "поймать исключение, обернуть его в другое и пробросить дальше" покрывает оператор onErrorMap

someMono
  .onErrorMap(e -> new CustomException(e))

Случай 5. Вернуть fallback-значение

onErrorReturn — еще один аналог try/catch. Позволяет поймать ошибку и вернуть альтернативное значение.

someMono
  .onErrorReturn(defaultValue)

Случай 6. onError ивенты в Подписчике

Если ошибка не была поймана с помощью операторов выше, она попадает в обработчик сигнала onError, если он задан Подписчиком

ublic final Disposable subscribe(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Consumer<? super Subscription> subscriptionConsumer) {
		return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
				completeConsumer, subscriptionConsumer, null));
	}

Метод subscribe() предоставляет несколько интерфейсов, позволяющих задавать поведение при возникновении различных событий: получения значения, возникновения ошибки, завершения стрима и подписки на Издателя.

Важно, что каждый инстанс Подписчика может быть подписан только на одного Издателя. При подписке на другого Издателя, предыдущая отменяется.

ℹ️ Чтобы не задавать каждый раз поведение Подписчика лямбда‑функциями, можно воспользоваться его более абстрактной версией, которая принимает на вход Subscriber. Чтобы реализовать свою имплементацию Подписчика, необходимо отнаследоваться от класса BaseSubscriberили заимплементить CoreSubscriber.

  • При использовании CoreSubscriber<T> требуется реализовать все 4 метода: void onSubscribe(Subscription s), void onNext(T s), void onError(Throwable t), void onComplete

  • BaseSubscriber<T> реализует методы CoreSubscriber, делая их финальными, и задает им базовое поведение. При необходимости можно переопределить один или несколько вспомогательных методов hookOnSubscribe, hookOnNext, hookOnError, hookOnComplete, чтобы добавить Подписчику дополнительную логику

Что, если исключение не обработать?

Если вызов блокирующий — в момент вызова.block() в основной поток будут выброшены все исключения, и их можно будет обработать try/catch

Если вызов неблокирующий — он будет пойман хуком и напечатан в логи. Но если такая ситуация возникает ввиду необработанного исключения — это бэд прэктис, и разработчиками фреймворка рекомендуется всегда задавать onError метод


Надеюсь, эта статья помогла кому-нибудь получше разобраться в работе с WebClient и обработкой ответов

Комментарии (1)


  1. boopiz
    22.01.2025 11:45

    "компоновать" - любою хорошего Русского языка! "асинхронную логику. " - этапять! это вообще как? как логика может быть синхронной или асинхронной?!

    "гибко обрабатывать ответы и ловить ошибки."... бляяааа. не могу даже дальше читать. Какое-то IT петросянство

    "синхронного АПИ."... это просто пздц! вот такой нынче "уровень" спикеров и прочих "специалистов".. чтоб вам так диагноз врач ставил, как вы издеваетесь над терминалогией и отраслью