Контекст

В этой статье проведены параллели между обработкой ошибок в feign и reactivefeign клиентах на примере реализации starter, который реализует аутентификацию по OAuth 2.0 в spring reactive подходе. В этой статье не будет описания реактивного стека. Если Вы хотите разобраться, какие есть преимущества и зачем его применять, рекомендую посмотреть эту серию статей. В этой статье не будет сравнения реактивного и нереактивного подхода. Для этого есть такая статья. В этой статье я хочу показать, что перевод компонентов на реактивность - несложная задача по рефакторингу, которая не займет много времени. Добавить монетку в копилку того, что перевод на реактивность - процесс, основные затраты в котором нужно сделать на первом шаге, когда Вы решаетесь применять этот подход, а адаптировать существующие компоненты просто.

Используемые технологии

Технологический стек:

  • Java 21;

  • OpenFeign;

  • ReactiveFeign;

  • Maven

  • Keycloak - IAM система;

  • Caffeine - Spring cache: требует 2 специализированные библиотеки в зависимостях проекта;

Какую пользу принесет эта статья?

Наша команда точечно применяет spring reactor для повышения производительности в компонентах системы, соответствующих профилю highload. Об опыте внедрения и использования -  можно прочитать в статье. Spring reactor не предполагает отказ от классического, нереактивного кода. Если создаваемый код стремится к SOLIDности, переход к реактивному стеку и обратно - конкретная задача. Особенность Reactor - функциональный способ обработки данных в потоке. Сделаю субъективное заключение - перевод кода в реактивность или обратно не проблема, если используется микросервисная парадигма. Требуется адаптировать отдельные специфические компоненты под реактивный способ обработки. Они взаимодействуют с контекстом, заголовками http запросов и другими, не целевыми объектами обработки. На примере starter аутентификации, о котором была статья, проведу параллели о том, насколько трудна переделка. В статье используются feign библиотека реактивной и нереактивной версий.

Описание целевого компонента

Акцент статьи - обсуждаем конкретный spring starter, который реализует аутентификацию. Проведем параллели и реализуем код, который поддержит функциональную обработку. Для реактивного кода важно обеспечить неблокирующую обработку. Данные должны "течь в потоке". Наша задача - очистить течение. Целевой компонент, реализующий аутентификацию, должен добавляться в код приложения и отвечать за выполнение необходимых действий. Модель процесса, поддерживающая аутентификацию (Рис.1) должна:

  • Формировать сообщение;

  • Отправлять сообщение;

  • Сохранять токен;

  • Удалять токен, если он стал не валиден;

Рис.1
Рис.1

Формировать сообщение будем в рассматриваемом компоненте. Токен будет создаваться в существующей IAM системе keycloak. В качестве кеша используем caffeine - нужен токен для конкретной инстанции и не требуется централизованного хранилища. Сам процесс запроса токена состоит в отправке сообщения в keycloak, получении ответа и сохранении в cache (рис.2).

Рис.2
Рис.2

Ранее для реализации использовали Feign клиент. Для реактивного подхода будет использован альтернативный Reactivefeign клиент. 

Реализация

При реализации логики аутентификации на нереактивом стеке использовалось:

  • feign.RequestInterceptor →

    • Добавлял в запрос к целевой системе токен; 

  • openfeign.FeignClient →

    • Клиент к keycloak, который использует content-type MediaType.APPLICATION_FORM_URLENCODED_VALUE;

  • feign.codec.ErrorDecoder →

    • Реализовал логику обработки ошибочных запросов, определяемых по http коду;

  • feign.Retryer →

    • Реализовал логику определения запросов с 401 кодом и политику повторных запросов;

  • cache.Caffeine →

    • Реализовал кеш;

Starter, реализующий не реактивную логику находится тут. Для реализации логики аутентификации на реактивном стеке будут использоваться следующие компоненты:

  • reactivefeign.client.ReactiveHttpRequestInterceptor →

    • Аналогичен не реактивному. Разница в механике дополнения данных;

  • reactivefeign.spring.config.ReactiveFeignClient →

    • Аналогичен. Разница в типе ответа и настройки идентифицирующего DI компонента;

  • reactivefeign.client.statushandler.ReactiveStatusHandler → 

    • Не аналогичен. Более специализирован. Для обработки необходимо четко определить набор статусов, на которые должна реагировать обработка. В нашем случае почти ничего не изменится по сравнению с нереактивным компонентом;

  • reactivefeign.retry.ReactiveRetryPolicy →

    • Не аналогичен. Более специализирован;

  • cache.Caffeine →

    • Тот же самый;

Таким образом получаем следующую таблицу с взаимозаменяемыми компонентами (Таб.1)

Таб.1

Назначение 

Не реактивный стек

Реактивный стек

Перехватчик запросов к целевой системе

feign.RequestInterceptor

reactivefeign.client.ReactiveHttpRequestInterceptor

Клиент

openfeign.FeignClient

reactivefeign.spring.config.ReactiveFeignClient

Логика обработки ошибочных запросов

feign.codec.ErrorDecoder

reactivefeign.client.statushandler.ReactiveStatusHandler

Логика политик обработки запросов

feign.Retryer

reactivefeign.retry.ReactiveRetryPolicy

Кеширование

cache.Caffeine

cache.Caffeine

Рассмотрим компоненты, реализующие:

  • ReactiveHttpRequestInterceptor

  • ReactiveFeignClient

  • ReactiveStatusHandler

  • ReactiveRetryPolicy

Кеш рассматривать не будем. Его настройки полностью аналогичны нереактивному подходу. Ниже будет ссылка на проект в целом. У Вас будет возможность проверить сказанное.

Зависимости

Для использования Reactivefeign нам потребуется добавить следующие зависимости:

<dependency>
    <groupId>com.playtika.reactivefeign</groupId>
    <artifactId>feign-reactor-spring-configuration</artifactId>
    <version>${берите последнюю актуальную версию}</version>
</dependency>
<dependency>
    <groupId>com.playtika.reactivefeign</groupId>
    <artifactId>feign-reactor-webclient</artifactId>
    <version>${берите последнюю актуальную версию}</version>
</dependency>

Пакеты com.playtika.reactivefeign предоставляют инструменты для создания асинхронных и неблокирующих HTTP-клиентов в Java-приложениях. Reactivefeign возвращает Mono или Flux, что позволяет обрабатывать результаты асинхронно. Подобно традиционному feign, reactivefeign использует аннотации @GET, @POST, и другие, для определения HTTP-методов. Reactivefeign используется в приложениях Spring для создания HTTP-клиентов и автоматической обработки конфигураций.

ReactiveFeignClient

Начнем рассмотрение с основного компонента - интерфейса, реализующего подключение и взаимодействие с сервером. В нашем случае IAM системой keycloak:

@ReactiveFeignClient(
        name = KEYCLOAK_NAME_FEIGN_CLIENT,
        url = "${keycloak-connection.url}",
        configuration = KeycloakFeignClientConfiguration.class
)
public interface KeyCloakFeignClient {
 
    @PostMapping(
            consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE
    )
    Flux<ResponseKeycloak> processingAuthData(Map<String, String> requestPropertiesKeycloak);
 
}

Для запроса используем Map. Это позволит гибко настраивать необходимые параметры не меняя объект. В качестве ответа получаем Flux c желанным токеном. В классе используется KeycloakFeignClientConfiguration. Через этот объект добавляем ReactiveStatusHandler и ReactiveRetryPolicy обработчиками в контекст клиента. Вы можете оценить, что изменения по сравнению с нереактивным кодом только в наименовании основной аннотации и параметра ответа. 

Для теста точечным образом конфигурируем класс. Добавляем необходимые автоконфигурации, используем wiremock для проверки интеграционного взаимодействия. Для выполнение этого класса так же требуется сконфигурировать кеш менеджер и добавить его в контекст. Это сделано через внутренний класс CachingTestConfig.

@SpringBootTest(
        classes = {
                KeyCloakFeignClient.class,
                ObjectMapper.class
        },
        properties = {
                "keycloak-connection.url=http://localhost:${wiremock.server.port}"
        }
)
@AutoConfigureWireMock(port = 0)
@ImportAutoConfiguration(classes = {
        ReactiveFeignAutoConfiguration.class,
        JacksonAutoConfiguration.class,
        HttpMessageConvertersAutoConfiguration.class,
        WebClientAutoConfiguration.class
})
@EnableReactiveFeignClients(clients = {KeyCloakFeignClient.class})
class KeyCloakFeignClientTest {
 
    @Autowired
    private KeyCloakFeignClient client;
 
    @Autowired
    private ObjectMapper mapper;
 
    @Value("/")
    private String urlPath;
 
    private final ByteArrayOutputStream byteArrayOutputStream =
            new ByteArrayOutputStream();
 
    @BeforeEach
    void setUp() {
        System.setOut(new PrintStream(byteArrayOutputStream));
    }
 
    @Test
    void checkKeyCloakFeignClient() {
        ReactiveFeignClient feignClientAnnotation =
                KeyCloakFeignClient.class.getAnnotation(ReactiveFeignClient.class);
 
        assertThat(feignClientAnnotation)
                .isInstanceOfSatisfying(ReactiveFeignClient.class, feignClient -> {
 
                    assertThat(feignClient.name())
                            .isEqualTo(KEYCLOAK_NAME_FEIGN_CLIENT);
 
                    assertThat(feignClient.url())
                            .isEqualTo("${keycloak-connection.url}");
 
                    assertThat(feignClient.configuration())
                            .contains(KeycloakFeignClientConfiguration.class);
                });
    }
 
    @EnableCaching
    @Configuration
    public static class CachingTestConfig {
 
        @Bean
        public CacheManager cacheManager() {
            return new CaffeineCacheManager();
        }
 
    }
 
}

Тестовым методом checkKeyCloakFeignClient проверим, что наш bean собран и имеет все необходимые настройки. Метод  processingAuthData_shouldReturnResult реализует проверку основного happy path:

  • Выполнили запрос → 

  • Получили ответ → 

  • С помощью ReactiveStatusHandler выполнили обработку и получили токен → 

    • Об этом далее;

  • Проверили, что вызов был;

@Test
void processingAuthData_shouldReturnResult() throws JsonProcessingException {
    final ResponseKeycloak responseKeycloak = TestData.getTestResponseKeycloak();
 
    final RequestPropertiesKeycloak requestPropertiesKeycloak = 
      TestData.getTestRequestPropertiesKeycloak();
    final RequestPropertiesKeycloakWrapper requestPropertiesKeycloakWrapper = 
      new RequestPropertiesKeycloakWrapper();
    final Map<String, String> mapRequestPropertiesKeycloak = requestPropertiesKeycloakWrapper.getMapObject(requestPropertiesKeycloak);
 
    stubFor(post(urlEqualTo(urlPath))
            .willReturn(
                    aResponse()
                            .withStatus(HttpStatus.OK.value())
                            .withHeader(
                              HttpHeaders.CONTENT_TYPE, 
                              MediaType.APPLICATION_JSON_VALUE
                            )
                            .withBody(mapper.writeValueAsString(responseKeycloak))
            )
    );
 
    StepVerifier
            .create(client.processingAuthData(mapRequestPropertiesKeycloak))
            .assertNext(
                    responseKeycloakData ->
                            assertThat(responseKeycloakData)
                                    .usingRecursiveComparison()
                                    .isEqualTo(responseKeycloak)
            )
            .verifyComplete();
 
 
    verify(postRequestedFor(urlEqualTo(urlPath)));
}

Метод  processingAuthData_shouldReturn401_retry здесь приведен, как "пасхалка". Он раскрывает суть обработки - если при получении ответа мы получим 401, то 3 раза попробуем перезапросить токен, каждый раз очищая кеш от предыдущего значения и пробросим исключение, которое нужно будет обработать в использующем этот компонент сервисе. Все тексты выделены в константные описания для удобства использования в классах и тестах. Их можно посмотреть в профильном классе.

@Test
void processingAuthData_shouldReturn401_retry() throws JsonProcessingException {
    final ResponseKeycloak responseKeycloak = TestData.getTestResponseKeycloak();
 
    final RequestPropertiesKeycloak requestPropertiesKeycloak = 
      TestData.getTestRequestPropertiesKeycloak();
    final RequestPropertiesKeycloakWrapper requestPropertiesKeycloakWrapper = 
      new RequestPropertiesKeycloakWrapper();
    final Map<String, String> mapRequestPropertiesKeycloak = 
      requestPropertiesKeycloakWrapper.getMapObject(requestPropertiesKeycloak);
 
    stubFor(post(urlEqualTo(urlPath))
            .willReturn(
                    aResponse()
                            .withStatus(HttpStatus.UNAUTHORIZED.value())
                            .withHeader(
                              HttpHeaders.CONTENT_TYPE, 
                              MediaType.APPLICATION_JSON_VALUE
                            )
                            .withBody(mapper.writeValueAsString(responseKeycloak))
            )
    );
 
    StepVerifier
            .create(client.processingAuthData(mapRequestPropertiesKeycloak))
            .expectErrorSatisfies(error ->
                    assertThat(error)
                            .isInstanceOf(OutOfRetriesException.class)
                            .hasMessageContaining(
                              String.format(
                                KEYCLOAK_NO_TOKEN_MESSAGE_WITH_MAX_ALLOWED_RETRIES, 
                                MAX_ALLOWED_RETRIES)
                            )
            )
            .verify();
 
    assertThat(byteArrayOutputStream.toString().trim())
            .contains(CLEAR_TOKEN);
 
    verify(exactly(4), postRequestedFor(urlEqualTo(urlPath)));
}

ReactiveStatusHandler

ReactiveStatusHandler и ReactiveRetryPolicy добавляется в обработку через класс KeycloakFeignClientConfiguration:

@Configuration
public class KeycloakFeignClientConfiguration {
 
    @Bean
    public ReactiveRetryPolicy retryer() {
        return new KeycloakReactiveRetryer();
    }
 
    @Bean
    public ReactiveStatusHandler statusHandler() {
        return new KeycloakStatusHandler();
    }
 
}

Это полностью аналогично тому, как конфигурация собиралась для нереактивного компонента. Так как конструкторы конфигурационных классов не требуют дополнительных классов, тест на этот класс будет традиционным. Его приводить здесь не буду. KeycloakStatusHandler будет имплементировать интерфейс ReactiveStatusHandler. Он помогает обрабатывать статусы в ответах HTTP - Реагировать на ошибочные ответы с помощью реализованной логики. Так мы централизуем обработку ошибок. Для нас основной будет 401. Чтобы использовать этот интерфейс, требуется определить методы:

  • shouldHandle - статусы, на которые необходимо реагировать;

  • decode - логика обработки ошибок;

Для нашего случая получилась следующая реализация:

public class KeycloakStatusHandler implements ReactiveStatusHandler {
 
 
    @Override
    public boolean shouldHandle(int status) {
        return status >= START_ERROR_STATUS
                && status < END_ERROR_STATUS;
    }
 
    @Override
    public Mono<? extends Throwable> decode(String methodKey, ReactiveHttpResponse<?> response) {
        return response.bodyData()
                .map(body ->
                        {
                            String message;
                            try {
                                message = IOUtils
                                  .toString(
                                  new ByteArrayInputStream(body), 
                                  StandardCharsets.UTF_8
                                );
                            } catch (IOException e) {
                                return new KeycloakConnectionException(
                                        response.status(),
                                        String.format(
                                                KEYCLOAK_UNDEFINED_MESSAGE_WITH_METHOD_KEY,
                                                e.getMessage()
                                        )
                                );
                            }
 
                            if (response.status() == HttpStatus.UNAUTHORIZED.value()) {
                                return new KeycloakAuthException(message);
                            } else {
                                return new KeycloakConnectionException(
                                        response.status(),
                                        message
                                );
                            }
 
                        }
                );
    }
 
}

Изменения по сравнению с нереактивным кодом - метод ограничивающий отдельные статусы, сигнатура ответа и функциональный способ обработки тела сообщения. Разбор тела сообщения не поменялся. Метод shouldHandle реагирует на диапазон статусов [400:600]. При получении статуса из диапазона мы разбираем его содержимое в строку:

  • Если при разборе содержимого произошла ошибка - ругнемся KeycloakConnectionException с понятным описанием;

  • Если статус 401 - ругнемся исключением KeycloakAuthException. К нему мы еще вернемся;

  • Если сообщение разобрали и статус не 401 - ругнемся KeycloakConnectionException с соответствующим описанием;

В тестовом классе есть одна особенность. Для целей тестирования не удалось создать объект интерфейса ReactiveHttpResponse, поэтому его используем как мок, задавая все необходимое поведение. Проверим состояние тестового класса.

class KeycloakStatusHandlerTest {
 
    private final KeycloakStatusHandler statusHandler =
            new KeycloakStatusHandler();
 
    @Test
    void checkKeycloakStatusHandler() {
        assertThat(statusHandler)
                .isInstanceOf(ReactiveStatusHandler.class)
                .hasNoNullFieldsOrProperties()
                .hasAllNullFieldsOrProperties();
        }
 
   }

Обработку сообщений с обрабатываемым и не обрабатываемым статусом.

@ParameterizedTest
@ValueSource(ints = {400, 401, 422, 450, 470, 500, 501, 522, 599})
void shouldHandle_true(int status) {
    assertThat(statusHandler.shouldHandle(status))
            .isTrue();
}
 
@ParameterizedTest
@ValueSource(ints = {600, 650, 699, 300, 350, 399})
void shouldHandle_false(int status) {
    assertThat(statusHandler.shouldHandle(status))
            .isFalse();
}

Обработку ошибочных сообщений без 401 статуса.

@ParameterizedTest
   @ValueSource(ints = {400, 402, 403, 422, 450, 499, 501, 522, 550, 590, 599})
   void decode_shouldReturnKeycloakConnectionExceptionWithStatusNo401(int statusCode) {
       final ReactiveHttpResponse<?> mockResponse = mock(ReactiveHttpResponse.class);
       final byte[] bodyInBite = TEST_STRING.getBytes(StandardCharsets.UTF_8);
 
       final KeycloakConnectionException exception = new KeycloakConnectionException(
               statusCode,
               TEST_STRING
       );
 
       when(mockResponse.bodyData())
               .thenReturn(Mono.just(bodyInBite));
 
       when(mockResponse.status())
               .thenReturn(statusCode);
 
       StepVerifier
               .create(statusHandler.decode(TEST_STRING, mockResponse))
               .assertNext(error ->
 
                       assertThat(error)
                               .usingRecursiveComparison()
                               .isEqualTo(exception)
 
               )
               .verifyComplete();
 
       verify(mockResponse).bodyData();
       verify(mockResponse, times(2)).status();
   }

Обработку ошибочных сообщений 401 статуса и эскалацию KeycloakAuthException.

@Test
   void decode_shouldReturnKeycloakAuthException() {
       final ReactiveHttpResponse<?> mockResponse = mock(ReactiveHttpResponse.class);
       final byte[] bodyInBite = TEST_STRING.getBytes(StandardCharsets.UTF_8);
 
       final KeycloakAuthException exception = new KeycloakAuthException(
               TEST_STRING
       );
 
       when(mockResponse.bodyData())
               .thenReturn(Mono.just(bodyInBite));
 
       when(mockResponse.status())
               .thenReturn(HttpStatus.UNAUTHORIZED.value());
 
       StepVerifier
               .create(statusHandler.decode(TEST_STRING, mockResponse))
               .assertNext(error ->
 
                       assertThat(error)
                               .usingRecursiveComparison()
                               .isEqualTo(exception)
 
               )
               .verifyComplete();
 
       verify(mockResponse).bodyData();
       verify(mockResponse).status();
   }

Проблемы с получением текста из сообщения с ошибкой.

   @ParameterizedTest
   @ValueSource(ints = {400, 402, 403, 422, 450, 499, 501, 522, 550, 590, 599})
   void decode_shouldReturnKeycloakConnectionExceptionWithUnknownMessage(int statusCode) {
       final ReactiveHttpResponse<?> mockResponse = mock(ReactiveHttpResponse.class);
       final byte[] bodyInBite = TEST_STRING.getBytes(StandardCharsets.UTF_8);
 
       final IOException ioException = new IOException(TEST_STRING);
 
       when(mockResponse.bodyData())
               .thenReturn(Mono.just(bodyInBite));
 
       when(mockResponse.status())
               .thenReturn(statusCode);
 
       try (MockedStatic<IOUtils> ioUtilsMockedStatic = mockStatic(IOUtils.class)) {
           ioUtilsMockedStatic.when(() -> 
                                    IOUtils
                                    .toString(
                                      any(InputStream.class), 
                                      any(Charset.class))
                                   )
                   .thenThrow(ioException);
 
           KeycloakConnectionException exception = new KeycloakConnectionException(
                   statusCode,
                   TEST_STRING
           );
 
           StepVerifier
                   .create(statusHandler.decode(TEST_STRING, mockResponse))
                   .assertNext(error ->
 
                           assertThat(error)
                                   .usingRecursiveComparison()
                                   .isEqualTo(exception)
 
                   )
                   .verifyComplete();
 
           ioUtilsMockedStatic.verify(() -> IOUtils.toString(any(InputStream.class), any(Charset.class)));
       }
 
       verify(mockResponse).bodyData();
       verify(mockResponse, times(1)).status();
   }

В приведенных тестах я не блокировал поток для проверки сообщений, а использовал StepVerifier - класс, который позволяет удобно работать над сверкой сообщений в реактивном потоке.

ReactiveRetryPolicy

ReactiveRetryPolicy предназначен для управления поведением повторных попыток при выполнении асинхронных запросов. Он помогает определить логику, повторения запросов. С его помощью можно автоматически повторять запросы при  ошибках, таких как недоступность сервера, предотвращать попытки повторного выполнения запросов, конфигурировать различные политики повторения, например, задержки между попытками. Для использования этого компонента требуется переопределить методы:

  • maxAllowedRetries - определяет количество повторных запросов;

  • retry - определяет политику запросов;

В нашем случае определим следующую логику выполнения запросов:

@Slf4j
public class KeycloakReactiveRetryer implements ReactiveRetryPolicy {
 
    @CacheEvict(value = CACHE_NAME_TOKEN_KEYCLOAK, allEntries = true)
    public void clearTokenCacheData() {
        log.info(CLEAR_TOKEN);
    }
 
    @Override
    public Retry retry() {
        return Retry.fixedDelay(2, Duration.ofMillis(PAUSE_BETWEEN_RETRY_CALL))
                .filter(KeycloakAuthException.class::isInstance)
                .doAfterRetry(doIt -> clearTokenCacheData())
                .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
                            new KeycloakConnectionException(
                                    HttpStatus.INTERNAL_SERVER_ERROR.value(),
                                    String.format(
                                      KEYCLOAK_NO_TOKEN_MESSAGE_WITH_MAX_ALLOWED_RETRIES, 
                                      MAX_ALLOWED_RETRIES
                                    )
                    )
                );
 
    }
 
    @Override
    public int maxAllowedRetries() {
        return MAX_ALLOWED_RETRIES;
    }
 
}

Если при обработке мы получили 401 код и выбросили исключение KeycloakAuthException.class, то в логике повторения, мы подхватим это исключение, почистим существующий кеш и повторим запрос заново. Так мы получим валидный токен. А если не получим, то несколько раз повторим логику запроса. Давайте проверим, что все работает именно так. Первая проверка была показана Выше в виде "пасхалки". Тут проверим остальное.

 Состояние тестового класса.

class KeycloakReactiveRetryerTest {
 
    private final KeycloakReactiveRetryer keycloakReactiveRetryer =
            new KeycloakReactiveRetryer();
 
    private final ByteArrayOutputStream byteArrayOutputStream =
            new ByteArrayOutputStream();
 
    @BeforeEach
    void setUp() {
        System.setOut(new PrintStream(byteArrayOutputStream));
    }
 
    @Test
    void checkKeycloakReactiveRetryer() {
        assertThat(keycloakReactiveRetryer)
                .isInstanceOf(ReactiveRetryPolicy.class)
                .hasAllNullFieldsOrProperties()
                .hasNoNullFieldsOrProperties()
        ;
    }
 
}

Очистку кеша.

@Test
void checkKeycloakReactiveRetryer_getLog() {
    keycloakReactiveRetryer.clearTokenCacheData();
 
    assertThat(byteArrayOutputStream.toString().trim())
            .contains(CLEAR_TOKEN);
 
}

Не изменение состояния кеша, если не было выброшено профильного исключения.

@Test
void retry_doNotGetLog() {
 
    assertThat(keycloakReactiveRetryer.retry())
            .isInstanceOf(Retry.class);
 
    assertThat(byteArrayOutputStream.toString().trim())
            .doesNotContain(CLEAR_TOKEN);
}

Количество повторных вызовов.

@Test
void maxAllowedRetries_shouldReturnValue() {
    assertThat(keycloakReactiveRetryer.maxAllowedRetries())
            .isEqualTo(MAX_ALLOWED_RETRIES);
}

ReactiveHttpRequestInterceptor

Мы подошли к последнему компоненту, который нужно рассмотреть - ReactiveHttpRequestInterceptor. С его помощью будем добавлять авторизационные данные в заголовок к целевому сервису. Вариантов несколько. Я выбирал между  Mono.deferContextual и Mono.create. Работа с контекстом в данном случае понижает управляемость кода за счет того, что будет необходимо сделать замыкание именно в контексте и делегировать в него работу с токеном. Работа через Mono.create место управления токеном. В итоге получилось так:

@Component
@RequiredArgsConstructor
public class KeyCloakRequestInterceptor {
 
    private final KeycloakFeignClientService keycloakFeignClientService;
 
    public ReactiveHttpRequestInterceptor addAuthorizeDataToRequest() {
        return reactiveHttpRequest ->
                Mono.create(reactiveHttpRequestMonoSink -> {
                    try {
                        reactiveHttpRequest
                                .headers()
                                .put(
                                    HttpHeaders.AUTHORIZATION, 
                                    List.of(keycloakFeignClientService.getBearerToken()
                                 )
                        );
                        reactiveHttpRequestMonoSink.success(reactiveHttpRequest);
                    } catch (RuntimeException exception) {
                        reactiveHttpRequestMonoSink.error(
                                new KeycloakConnectionException(
                                  HttpStatus.UNPROCESSABLE_ENTITY.value(), 
                                  exception.getMessage())
                        );
                    }
                });
 
    }
 
}

С помощью Mono.create() мы получаем полное управление асинхронным процессом получения токена через MonoSink, который является интерфейсом предназначенным для программного завершения или прерывания асинхронного процесса внутри Mono. Именно поэтому нам надо явном сделать успешное и неуспешное завершение процесса. В нашем starter мы сделаем этот компонент публичным. Именно его добавление позволит реализовать всю задуманную нами логику. Давайте его протестируем - проверим тестовый объект, добавление необходимых заголовков и вызов сервиса, который запросит токен в keycloak.

@ExtendWith(MockitoExtension.class)
class KeyCloakRequestInterceptorTest {
 
    private KeyCloakRequestInterceptor keyCloakRequestInterceptor;
 
    @Mock
    private KeycloakFeignClientService keycloakFeignClientService;
 
    @BeforeEach
    void setUp() {
        keyCloakRequestInterceptor = new KeyCloakRequestInterceptor(keycloakFeignClientService);
    }
 
    @Test
    void checkKeyCloakRequestInterceptor() {
        assertThat(keyCloakRequestInterceptor)
                .hasFieldOrPropertyWithValue(
                      "keycloakFeignClientService", 
                      keycloakFeignClientService
                )
                .hasNoNullFieldsOrProperties()
        ;
    }
 
    @Test
    void addAuthorizeDataToRequest_addHeaders() {
        final String token = TEST_STRING;
 
        ReactiveHttpRequest reactiveHttpRequestMock = mock(ReactiveHttpRequest.class);
 
        when(reactiveHttpRequestMock.headers())
                .thenReturn(new HashMap<>());
 
        when(keycloakFeignClientService.getBearerToken())
                .thenReturn(token);
 
        assertThat(keyCloakRequestInterceptor.addAuthorizeDataToRequest())
                .isInstanceOfSatisfying(
                    ReactiveHttpRequestInterceptor.class, 
                    reactiveHttpRequestInterceptorData ->
 
                        StepVerifier
                                .create(
                                  reactiveHttpRequestInterceptorData.apply(reactiveHttpRequestMock))
                                .assertNext(reactiveHttpRequest ->
 
                                        assertThat(reactiveHttpRequest.headers())
                                                .containsKey(HttpHeaders.AUTHORIZATION)
                                                .extractingByKey(HttpHeaders.AUTHORIZATION)
                                                .isEqualTo(List.of(token))
 
                                )
                                .verifyComplete()
                );
 
        verify(keycloakFeignClientService).getBearerToken();
        verify(reactiveHttpRequestMock, times(2)).headers();
 
    }
 
    @Test
    void addAuthorizeDataToRequest_thrownException() {
        final RuntimeException exception = new RuntimeException(TEST_STRING);
 
        ReactiveHttpRequest reactiveHttpRequestMock = mock(ReactiveHttpRequest.class);
 
        when(reactiveHttpRequestMock.headers())
                .thenReturn(new HashMap<>());
 
        when(keycloakFeignClientService.getBearerToken())
                .thenThrow(exception);
 
        assertThat(keyCloakRequestInterceptor.addAuthorizeDataToRequest())
                .isInstanceOfSatisfying(ReactiveHttpRequestInterceptor.class, reactiveHttpRequestInterceptorData ->
 
                        assertThatThrownBy(() -> 
                                           reactiveHttpRequestInterceptorData.apply(reactiveHttpRequestMock).block())
                                .usingRecursiveComparison()
                                .isEqualTo(
                                        new KeycloakConnectionException(
                                                HttpStatus.UNPROCESSABLE_ENTITY.value(),
                                                exception.getMessage()
                                        )
                                )
                );
 
        verify(keycloakFeignClientService).getBearerToken();
        verify(reactiveHttpRequestMock).headers();
 
    }
 
}

Тут мы не описали сам сервис запроса данных и кеширования KeycloakFeignClientService. Его реализация и тест полностью аналогичен тому, как это было сделано в нереактивном стиле.  В KeycloakFeignClientService надо притормозить поток, получить данные и добавить в поток исполнения основного теста токен, который будет помещен в заголовок запроса. С точки зрения производительности это отражается на мониторинге сервиса. Появляются временные пики обработки, когда мы "притормаживаем поток". Код и тест будут в профильных классах приложенного финального проекта. 

В итоге

Цель этой статьи - показать, что реактивность и ее использование можно органично вписать в существующий рабочий контекст и использовать этот инструмент там, где нужно повысить производительность системы. Продемонстрированным кодом я постарался показать, что в использовании реактивного кода нет нерешаемых задач. Мы добавили Mono и Flux и несколько специальных методов в основной класс и тестовые методы. Оперирование функциональным кодом, по субъективному мнению автора - вопрос погружения в реактивные компоненты, решаемый быстро.

Слабые места и что дальше

В приведенном решении есть ряд недостатков. Приглашаем порассуждать над ними и предложить идеи о том, как из можно решить:

  • Остановка потока в момент передачи токена в заголовок запроса;

    • В нашем случае эта необходимость тормозит поток и на мониторинге мы получаем всплески времени обработки;

  • Заточенность starter под использование с одним клиентов в вызываемом сервисе;

  • Может, Вы найдете что-то еще. Если найдете, напишите, пожалуйста, в комментариях;

Вместо завершения и ссылка на репозиторий

Проект готов. Уже помогает нам оптимизировать существующие реактивные сервисы. Будет здорово, если он поможет и Вам.

Благодарности

Спасибо моей команде и всем, кто помог мне с материалом для данной статьи. Ваша помощь очень ценна.

 Ссылка на репозиторий

Репозиторий с обезличенным проектом.

Комментарии (2)