Контекст

Это статья описывает опыт миграции традиционного приложения java spring на Spring reactor стек. О специфике разрабатываемой информационной системы, уже подробно рассказывал. Кратко о системе - смешанная сервисно-микросервисная архитектура. Чтобы получить бизнес результат, требуется вызвать микросервис, который вызывает сервис, который вызывает набор микросервисов для формирования ценностного набора данных. Получается цепочка вызовов. Сервисы и микросервисы разрабатывались на традиционном технологическом стеке, без использования реактивного подхода.

Мое знакомство с реактивным подходом произошло в 2022 году. Плюсы и минусы я осознавал, как мог. В рамках разрабатываемой системы мне было сложно найти ему точку применения. Тогда сложилось понимание, что в существующих рамках реактивный подход не даст возможности эволюционных изменений. Если его применять, то применять ко всему и сразу, как постулирует реактивный манифест.  Даешь революцию! Это было заблуждение. Его я не смог разрушить, даже изучая документацию и статьи. Чего-то релевантного моей ситуации в открытых источниках я найти не мог. Этой статьей постараюсь предложить путь и убедить сомневающихся, что долго раздумывать не стоит. Делайте смелый шаг на пути к реактивному стеку.

Используемые технологии

Технологический стек:

  • Java 21;

  • Maven;

  • Spring boot 3;

  • Spring reactor webflux;

  • Gatling;

О Gatling подробно рассказывал в этой статье. Отмечу, Gatling в контексте исследований этой статьи, стал для меня спасительным инструментом. Если их у Вас еще нет, задумайтесь.

Какую пользу принесет эта статья?

Если вы создаете свои сервисы не на Spring reactor и на любой из ниже приведенных вопросов для себя ответите утвердительно, то информация в статье будет полезна:

  • Есть потребность ускорить время обработки запросов создаваемых и поддерживаемых сервисов?

  • Есть потребность ускорить время обработки запросов к внешним источникам, таких как - базы данных (SQL, NoSql), брокеры сообщений и т.д.

Если вы зажаты сервисами не реактивного подхода и на вопросы ответили утвердительно, то нам по пути. 

Мотивация изменений

В контексте я привел описание цепочки вызовов сервисов, которая приводит к бизнес результату. Все сервисы справляются с нагрузкой - компания движется. Запускаются новые страховые продукты - появляются новые каналы распространения. Если заглянуть немного вперед, то понятно, что нагрузка на систему будет только расти, а время обработки требуется сокращать. Сервисов становится все больше, как и взаимодействий между ними - синхронных и асинхронных. Есть исследования, которые подтверждают - 1% рост выручки на каждые 100 миллисекунд ускорения загрузки страницы с продаваемым продуктом. Сам я верю в то, что оптимизация нефункциональных аспектов функционирования моей системы будет повышать ценность, которую приносит создаваемая мной система. Время делать следующий шаг.

Увеличение скорости обработки запросов при имеющейся и более высокой нагрузке - именно то, к чему мы стремимся. Но есть одна неприятность. Сервисов и микросервисов уже много и становится  все больше. Шаг, который логично сделать к повышению качества сервисов, должен не отвергать уже сделанного, а дополнять и улучшать. Нужен рефакторинг существующих сервисов удушающим приложением. Это позволит не вводить систему в целом в шок и поддержит эволюционирующую архитектуру. Заменяем менее производительный сервис более производительным, оставляем систему в консистентном состоянии. Маленькая победа, небольшое ускорение в целом и эволюционный шаг в развитии. Заменять нужно так, чтобы новый сервис поддержал существующий функционирующий контекст. Сначала кажется, что при синхронной обработке запроса, использовании io взаимодействии, Spring reactor это не та технология, которая сможет поддержать задуманное. Но все не так однозначно. Spring reactor - это набор технологий:

  • Неблокирующая потоковая асинхронная обработка:

    • Это ядро всего фреймворка. Потоковая обработка данных;

  • Набор типизированных последовательностей данных:

    • Mono и Flux;

  • Неблокируемый ввод/вывод данных (nio):

    • Тут, как правило, используется сервер Netty, взамен привычному Tomcat;

Из набора технологий мы можем взять не все и сразу, как хотелось бы, а что-то одно. Вполне возможно использовать ядро фреймворка с Mono и Flux, а nio подключить позднее. Дело в том, что именно nio является ограничением к использованию Spring reactor в io среде, но если Вы можете организовать коммуникацию нескольких сервисов внутри nio, а результаты отдавать в io среду, то тут тоже будет прирост производительности. Переводя отдельные сервисы на реактивное ядро, мы получим ценность. Переводя набор сервисов на nio, ценности добавится. После того, как все звенья цепи будут поддерживать реактивное ядро, переключение процесса в целом с io на nio будет уже не сложной задачей. Прозрачная эволюция, где каждый шаг приносит ускорение.

Немного теории

Использование Spring reactor вместе с tomcat проверенное решение. Оно:

  • Поддержит модель взаимодействия реактивного приложения с нереактивным;

  • Позволит перейти к многопоточной реактивной модели обработки данных;

Используя эту модель, основное преимущество, которое мы получим - асинхронная обработка запросов в нескольких потоках. В отличие от традиционного подхода, где каждый поток блокируется во время выполнения операции ввода-вывода, Spring reactor позволяет обрабатывать большее количество запросов одновременно. В случае классического приложения на tomcat, если один пользователь ожидает ответа, другие пользователи могут столкнуться с задержками. В то же время приложение на Spring reactor может продолжать обслуживать другие запросы благодаря асинхронной обработке.

Асинхронный подход также приводит к более эффективному использованию ресурсов сервера. Вместо того, чтобы создавать сотни или тысячи потоков, что требует значительных затрат памяти, приложение на Spring reactor работает с небольшим количеством потоков и использует их более эффективно. Это основные преимущества, которые мы получим, сделав первый шаг в использовании Spring reactor. Использование nio дало более значимое преимущество, но использовать nio пока нет возможности. 

Планируемый путь улучшений

Мы находимся в ландшафте (Рис.1) со смешанным синхронно-асинхронным взаимодействием.

Рис.1
Рис.1

Везде используется tomcat и нет Reactor сервисов. Что предлагается изменить:

  1. Каждому сервису планируется сделать аналог на Reactor с Tomcat, но без nio. Традиционные сервисы не удаляем, но выводим из эксплуатации. Позже они нам могут понадобится;

  2. После того, как функциональность определенного продукта будет использовать только сервисы Reactor без nio, переключить их на nio цепочку в целом;

    1. Если сервис используется для нескольких расчетов, то в расчете без готовых сервисов Reactor, требуется переключить на традиционные аналоги, замены которым мы сделали в п.1;

  3. Далее, переводим все расчеты на Reactor сервисы;

Пункт 1 предполагает реализацию около 40 сервисов. Реализация каждого аналога должна дать ускорение времени обработки и повышения производительности. Пункт 2 предполагает значимое ускорение расчета определенной бизнес функции. Пункт 3  дает ускорение продукта в целом.

Реализация

  • При демонстрации кода я буду использовать удобный и лаконичный lombok;

  • Традиционный сервис написан с использование spring boot 3.3.5;

  • В реактивном сервисе используется версия 3.4.2.

Последняя на момент написания статьи - это привело к тому, что для тестов вместо @MockBean я уже использую @MockitoBean.

Акцентировать внимание в статье на это не буду. Это адекватная замена. Подробнее можно прочитать в документации.

Переходим к рассмотрению конкретных аспектов реализации. Полный код традиционного и модифицированного приложений, покрытый тестами выложен на Git, чтобы была возможность сравнить. Ссылки ниже. Сервисы обладают аналогичной функциональностью (Рис.2):

  • Получить запрос;

  • Проверить, есть ли искомые данные в Кеш;

    • Если есть - достать;

    • Если нет - Выполнить запрос в внешнюю систему;

      • Положить результат в Кеш;

  • Трансформировать данные к результирующему виду;

  • Отдать ответ;

Рис.2
Рис.2

Традиционный сервис

Сервис намеренно заурядный с точки зрения функциональности:

  • При поступлении запроса проверяем данные собственным валидатором;

  • При выполнении запроса к внешней системе обрабатываем ошибки соединения;

    • Используется короткоживущее централизованное кэширование запросов с помощью Redis;

  • При формировании запроса и ответа в сервисе используем собственный маппер и в нем обрабатываем возможные ошибки;

Cервис без reactor для внешнего соединения использует feign client. В статьях тут и тут, описаны способы работы с этой библиотекой. Итого, из технологий помимо spring boot - feign, cache без примеси reactor. Полный код традиционного сервиса находится тут.

Реактивный сервис

В целом, все те же технологии, что и в традиционном сервисе, но уже с reactor.  Первым шагом добавим основную зависимость для работы с reactor. Добавлять будем так, чтобы reactor поддерживал несколько потоков. Вполне себе работающее решение. Для этого для сервиса будем использовать не Netty, а Tomcat:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-reactor-netty</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Это решение предполагает, что настройка количества потоков будет по умолчанию или необходимо использовать scheduler. Scheduler есть несколько разных видов. В этом примере я не буду раскрывать тему настройки потоков. И в традиционном и в реактивном примере оставлю это на настройки по умолчанию. Это тема хорошо описана в документации. Если у Вас есть примеры удачных настроек и конфигурирования, то делитесь ими в комментариях. Они будут полезны.

Кеш

Все, наш сервис готов к миграции на реактивный стек. Следующим шагом настроим cache. Используем реактивную библиотеку:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

Следом сконфигурируем cache менеджер. Нам потребуется время жизни записей и название кеша. Время жизни вынесем в конфигурацию, название определим в константе. Она нам понадобится еще и при определении @Cacheable. Нам потребуется определить ReactiveRedisTemplate задав ему значение ключа и кэшируемого объекта. Будем использовать Lettuce, как наиболее производительный cache клиент. Ну и не будем кэшировать нулевые объекты. В итоге вместе с необходимыми аннотациями получится такой класс:

@Configuration
@EnableCaching
@RequiredArgsConstructor
public class CacheConfig {
 
    private final CacheProperties cacheProperties;
 
    @Bean
    public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate(
      LettuceConnectionFactory connectionFactory) {
        RedisSerializationContext<String, Object> serializationContext = 
          RedisSerializationContext
                .<String, Object>newSerializationContext(new StringRedisSerializer())
                .value(new GenericJackson2JsonRedisSerializer())
                .build();
        return new ReactiveRedisTemplate<>(connectionFactory, serializationContext);
    }
 
    @Bean
    public CacheManager cacheManager(LettuceConnectionFactory connectionFactory) {
        RedisCacheConfiguration cacheConfiguration = 
          RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(cacheProperties.ttl()))
                .disableCachingNullValues();
 
        return RedisCacheManager.builder(connectionFactory)
                .withCacheConfiguration(CACHE_NAME, cacheConfiguration)
                .cacheDefaults(cacheConfiguration)
                .build();
    }
 
}

Контроллер

Контроллер изменился. Если раньше он отдавал объект, то теперь он отдает Mono. Это основной изменение. Так API нашего сервиса преобразовалось в Spring reactor. Более в контроллере нет никаких изменений. Теперь про тест. Если ранее в тесте мы использовали org.springframework.test.web.servlet.MockMvc для выполнения вызовов нашего сервиса в тестах контроллераметрикactuator, swagger, то теперь конфигурирование будет через org.springframework.test.web.reactive.server.WebTestClient. Если мы захотим имплементировать контроллер в тестовый контекст, то это можно будет сделать с помощью аннотации @WebFluxTest, как это сделано в контроллере. А если потребуется настроить весь тестовый контекст, то нужно имплементировать реализацию webTestClient, как, например, в тесте actuator:

...
 
private WebTestClient webTestClient;
 
    @BeforeEach
    void setUp(ApplicationContext context) {
        webTestClient = WebTestClient.bindToApplicationContext(context).build();
    }
 
...

Работа с webTestClient тоже реализована в стиле работы со stream. Также при создании mock, нам нужно учесть, что результат мы предоставляем в виде Mono.

Реактивный клиент для внешних вызовов

Нам понадобится реактивный клиент для вызова внешнего сервиса. В традиционном подходе мы использовали feign. Здесь тоже будем использовать реактивный feign клиент. @ReactiveFeignClient — это аннотация, предоставляемая com.playtika.reactivefeign. Она позволяет создавать асинхронные HTTP-клиенты декларативно. Нам нужны две основные зависимости.  Одна реализует реактивный feign, а вторая предоставляет инструменты имплементации в spring экосистему: 

<dependency>
    <groupId>com.playtika.reactivefeign</groupId>
    <artifactId>feign-reactor-spring-configuration</artifactId>
    <version>[Возьмите последнюю стабильную версию из mvn repository]</version>
</dependency>
<dependency>
    <groupId>com.playtika.reactivefeign</groupId>
    <artifactId>feign-reactor-webclient</artifactId>
    <version>[Возьмите последнюю стабильную версию из mvn repository]</version>
</dependency>

ReactiveFeign и Feign - простые, мощные библиотеки. Просто настраиваемые, просто сопровождаемые. Добавляем в конфигурацию параметров - таймаут на соединение, чтение, запись и подключение. Настройки указываются через имя клиента. Забежим вперед и имя клиента у нас будет " externalServiceClient", а настройки в миллисекундах определим такие:

#Feign
reactive:
  feign:
    clients:
      externalServiceClient:
        connectTimeoutMillis: 100
        readTimeoutMillis: 100
        writeTimeoutMillis: 100
        idleTimeoutMillis: 100

Настало время для самого клиента. По сравнению с традиционным feign разница будет только в аннотации. Раньше было @FeignClient, а теперь @ReactiveFeignClient. Вот и все. Класс с конфигурацией клиента остался таким же, как был. Там может быть обработка ошибок, перехватчик заголовков, специализированные клиенты для аутентификации и авторизации и пр. В нашем случае будет обработка ошибок соединения. Файл с конфигурацией я показывать не буду. Он есть в приложенном примере вместе с тестами на него. Сам клиент выглядит так:

@ReactiveFeignClient(
        name = "externalServiceClient",
        url = "${external-service.online-features.url}",
        configuration = ReactiveFeignConfig.class
)
public interface ExternalServiceClient {
 
    @PostMapping(
            value = "${external-service.online-features.path}",
            produces = MediaType.APPLICATION_JSON_VALUE,
            consumes = MediaType.APPLICATION_JSON_VALUE
    )
    Flux<ExternalServiceResponse> getFeastPersonResponse(@RequestBody ExternalServiceRequest personRequest);
 
}

В методе клиента появился Flux. Он и аннотация символизирует реактивность. Теперь напишем на него тест. Для конфигурации теста нам потребуется:

  • Задать настройки для подключения;

  • Использовать библиотеку для сетевых заглушек - у нас это wiremock;

  • Использовать несколько библиотек для автоконфигурирования реактивного клиента, сериализатор сообщений и http менеджера;

Конфигурация тестового класса будет такой:

@SpringBootTest
@TestPropertySource(properties = 
   {
    "external-service.online-features.url=http://localhost:${wiremock.server.port}",
    "external-service.online-features.path=/string",
    "spring.main.allow-bean-definition-overriding=true"
  }
)
@AutoConfigureWireMock(port = 0)
@ImportAutoConfiguration(classes = {
        ReactiveFeignAutoConfiguration.class,
        JacksonAutoConfiguration.class,
        HttpMessageConvertersAutoConfiguration.class,
})
@EnableReactiveFeignClients(clients = {ExternalServiceClient.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
class ExternalServiceClientTest {
 
    ...
 
}

Теперь протестируем, что наш feign именно реактивный и имеет валидные настройки:

@Test
void checkFeastClientTest() {
 
    ReactiveFeignClient reactiveFeignClientAnnotation =
            ExternalServiceClient.class.getAnnotation(ReactiveFeignClient.class);
 
    assertThat(reactiveFeignClientAnnotation)
            .isInstanceOfSatisfying(
                    ReactiveFeignClient.class, 
                      reactiveFeignClientData -> {
 
                assertThat(reactiveFeignClientData.name())
                        .isEqualTo("externalServiceClient");
 
                assertThat(reactiveFeignClientData.url())
                        .isEqualTo("${external-service.online-features.url}");
 
                assertThat(reactiveFeignClientData.configuration())
                        .contains(ReactiveFeignConfig.class);
 
            });
 
}

Проверим, что при запросе мы получаем в ответ реактивный поток с Flux. Для этого потребуются настроить заглушки и использовать дополнительные тестовые зависимости. Избыточно эта информация описана в этой статье. Я для проверки flux буду использовать оператор StepVerifier. Он позволяет проверить, какие значения выдает реактивный поток, их порядок и другие аспекты класса. Сначала тестом проверим валидный поток:

@Test
void getFeastResponse_returnOk() throws JsonProcessingException {
    final ExternalServiceResponse externalServiceResponse = 
                          TestDataBuilder.getTestFeastPersonResponse();
    final ExternalServiceRequest externalServiceRequest =
                          TestDataBuilder.getTestFeastPersonRequest();
 
    // Заглушка
    stubFor(post(urlEqualTo(url))
            .withHeader(
              HttpHeaders.CONTENT_TYPE, 
              WireMock.equalTo(MediaType.APPLICATION_JSON_VALUE))
            .willReturn(
                    aResponse()
                            .withStatus(HttpStatus.OK.value())
                            .withHeader(
                              HttpHeaders.CONTENT_TYPE, 
                              MediaType.APPLICATION_JSON_VALUE)
                            .withBody(
                              objectMapper.writeValueAsString(externalServiceResponse))
                      )
           );
 
    //Проверка потока
    StepVerifier
            .create(
                externalServiceClient.getFeastPersonResponse(externalServiceRequest)
            )
            .expectNextMatches(
                feastResponseData -> feastResponseData.equals(externalServiceResponse)
            )
            .verifyComplete();
 
    // Проверим, что вызов действительно был тут
    verify(1, postRequestedFor(urlEqualTo(url)));
}

В следующем методе проверим, что при ошибке соединения мы обработаем ее нашей конфигурацией и она будет такой, как мы задали в конфигурационных параметрах:

@Test
    void getFeastResponse_shouldThrown4xxStatuses() 
      throws JsonProcessingException {
        final ExternalServiceRequest externalServiceRequest = 
                              TestDataBuilder.getTestFeastPersonRequest();
        final ErrorExternalServiceModel error = 
                              TestDataBuilder.getTestErrorFeastModel();
 
        stubFor(post(urlEqualTo(url))
                .withHeader(
                  HttpHeaders.CONTENT_TYPE, 
                  WireMock.equalTo(MediaType.APPLICATION_JSON_VALUE))
                .willReturn(
                        aResponse()
                                .withStatus(422)
                                .withHeader(
                                  HttpHeaders.CONTENT_TYPE, 
                                  MediaType.APPLICATION_JSON_VALUE
                                )
                                .withBody(
                                  objectMapper.writeValueAsString(error))
                          )
               );
 
        StepVerifier
                .create(
                  externalServiceClient.getFeastPersonResponse(externalServiceRequest)
                )
                .expectErrorMatches(
                        throwable -> 
                                throwable instanceof ExternalServiceClientException
                                && throwable.getMessage().equals(error.detail())
                )
                .verify();
 
        verify(postRequestedFor(urlEqualTo(url)));
 
    }

Тестовый класс с @ReactiveFeignClient содержит больше примеров. Их можно посмотреть тут.
Напишем сервис над клиентом, в котором будет:

  • Обработка потока;

  • Кеш;

  • Логирование времени на запрос - какое-то время будем наблюдать за интеграционным потоком;

  • Обработка ошибок соединения;

Сервис клиента:

@Service
@RequiredArgsConstructor
@Slf4j
public class ExternalServiceService {
 
    private final ExternalServiceClient externalServiceClient;
 
    @Cacheable(value = CACHE_NAME)
    public Flux<ExternalServiceResponse> getFeastPersonResponse(
      ExternalServiceRequest personRequest) {
 
        final LocalDateTime startTime =
                LocalDateTime.now();
 
        return externalServiceClient.getFeastPersonResponse(personRequest)
                .doOnNext(
                        logger -> log.info(
                                "Duration call {} for {}",
                                Duration.between(LocalDateTime.now(), startTime),
                                personRequest)
                )
                .onErrorMap(
                        error -> {
                            if (error instanceof RuntimeException)
                                return new ExternalServiceConnectionException(
                                        String.format(
                                                "Ошибка соединения: %s",
                                                error.getMessage()));
                            return error;
                        });
    }
}

У меня получилось два разных класса для полного тестирование этого сервиса. Первый для тестирования функциональности. В нем я использую уже знакомый нам StepVerifier, проверяю функциональности и запись лога. Во втором классе я более пристально смотрю на кеширование. Для него потребовалось настроить EmbeddedRedis. Он поддерживает реактивное, асинхронное подключение. В тестовой конфигурации запускаем сервис, проводим тест, останавливаем сервис. В самом тесте 10 раз делаем вызов сервиса и с помощью verify от mockito убеждаемся, что вызов был один:

@SpringBootTest(
        classes = ExternalServiceCacheServiceTest.EmbeddedRedisConfiguration.class
)
@EnableCaching
class ExternalServiceCacheServiceTest {
 
    @Autowired
    private ExternalServiceService externalServiceService;
 
    @MockitoBean
    private ExternalServiceClient externalServiceClient;
 
    @Test
    void checkCacheHitsManyTimeButClientCacheOne() {
   
      final Request request = 
                      TestDataBuilder.getTestRequestWithOutEntityId();
        
      final Response response = 
                      TestDataBuilder.getTestResponse();
        
      final ExternalServiceRequest externalServiceRequest = 
                      TestDataBuilder.getTestFeastPersonRequest();
        
      final ExternalServiceResponse externalServiceResponse = 
                      TestDataBuilder.getTestFeastPersonResponse();
 
        when(externalServiceClient.getFeastPersonResponse(externalServiceRequest))
                .thenReturn(Flux.just(externalServiceResponse));
 
        StepVerifier
                .create(
                  externalServiceService.getFeastPersonResponse(externalServiceRequest)
                )
                .expectNextMatches(
                  externalServiceResponseData -> 
                      externalServiceResponseData.equals(externalServiceResponse)
                )
                .verifyComplete();
 
        verify(externalServiceClient).getFeastPersonResponse(externalServiceRequest);
 
        IntStream.range(0, 10)
                .forEach(
                        doIt -> StepVerifier
                                .create(externalServiceService.getFeastPersonResponse(externalServiceRequest))
                                .expectNextMatches(externalServiceResponseData -> externalServiceResponseData.equals(externalServiceResponse))
                                .verifyComplete()
                );
 
        verify(externalServiceClient).getFeastPersonResponse(externalServiceRequest);
 
    }
 
    @TestConfiguration
    @RequiredArgsConstructor
    static class EmbeddedRedisConfiguration {
 
        private RedisServer redisServer;
 
        private final RedisProperties redisProperties;
 
        @PostConstruct
        public void start() throws IOException {
            redisServer = new RedisServer(redisProperties.getPort());
            redisServer.start();
        }
 
        @PreDestroy
        public void stop() throws IOException {
            this.redisServer.stop();
        }
 
    }
 
}

Основной класс обработчик сервиса

Контроллер сервиса немного изменился, классы маппингов данных не претерпели никаких изменений, а основной обработчик логики сервиса тоже поменяется.  Ранее он последовательно обрабатывал несколько шагов, по результатам которого создавался требуемый объект. Теперь процесс будет потоковым. Для преобразования объекта используем map и результатом обработки будет Mono. Мотивация к такому решению - потоковые асинхронные преобразования с блокировкой в последний возможный момент. Для преобразования будем использовать singleOrEmpty(). Полный код класса:

@Service
@Slf4j
@RequiredArgsConstructor
public class ServiceReactorService {
 
    private final RequestToExternalServiceRequestMapper requestToExternalServiceRequestMapper;
    private final ExternalServiceResponseToResponseMapper externalServiceResponseToResponseMapper;
    private final ExternalServiceService externalServiceService;
 
    public Mono<Response> processingFeast(Request request) {
        return externalServiceService
                .getFeastPersonResponse(
                  requestToExternalServiceRequestMapper.mapRequestToFeast(request)
                )
                .map(
                  externalServiceResponseToResponseMapper::mapFeastResponseToResponse
                )
                .singleOrEmpty();
    }
 
}

Для теста нам потребуется сделать mock атрибутов, задать поведение и с помощью StepVerifier проверить результат.

Тестирование

Сервис готов, настало время провести замеры и посмотреть разницу в производительности между традиционным и реактивным сервисами. Нагружать сервисы мы будем с помощью gatling. В сервисах есть кеш. Надо предусмотреть 2 серии замеров. Одни замеры будут с уникальными запросами, чтобы не попадать в кеш, а вторая серия запросов будет с попаданием в кеш. Прогонять тесты будем в тестовой среде. Для каждого сервиса выделен 1 под kubernetes. Поду выделено 512Mi памяти и 600m cpu. Никаких настроек масштабирования не определено. Сервисы будут изолированы и на них не будет сторонней нагрузки.

Методика тестовых прогонов

Для каждого случая:

  • Выполним по 10 прогонов;

  • Каждый прогон длятся по 3 минуты;

  • Количество пользователей (подключений) за время прогона будет расти линейно от 1 до 3;

Прогоны без cache

Первый тестовый прогон выполняется для регистрации значений без попадания в кеш. Ключом для кеша является полный объект. В Gatling, перед тем как начать прогон, определим одно из значений объекта уникальным для каждого запроса. 

В итоге получилось:

Сервис

Количество прогонов

Min время за прогон

Max время за прогон

 Среднее время за прогон

Service-reactor

10

10 мс

590 мс

140 мс

Service-no-reactor

10

40 мс

1132 мс

196 мс

Service reactor

Общий усредненный профиль нагрузки такой (Рис.3)

Рис.3
Рис.3

Желтым показаны средние значения, зеленым успешные, красным ошибки. При работе с реактивным стеком необходимо выдерживать минимальное количество соединений. Если потоков будет много, то время обработки будет увеличиваться. В нашем случае настройки сервиса по умолчанию сами управляют этими значениями, но если мы увеличим количество соединений в тесте с 3 до 10, то время обработки увеличится в 2-3 раза. Если мы захотим увеличить количество запросов, то время обработки еще снизится. При этом расходование памяти в сервисе остается на низком уровне - расход head памяти 60%, а non heap - 14%. Ошибки - ReadTimeoutException и SocketConnectionException.

Service no reactor

Общий усредненный профиль нагрузки такой (Рис.4)

Рис.4
Рис.4

Gatling сам регулируют нагрузку в зависимости от производительности сервиса. Здесь мы видим, что общее количество значений, которым удалось нагрузить сервис на 20% меньше. Минимальное, среднее и максимальное время больше. По памяти - расход head памяти 80%, а non heap - 14%. Ошибки - ReadTimeoutException и SocketConnectionException.

Тестовые прогоны с cache

Второй тестовый прогон будет с использованием кеша. При запросе будем оперировать 5 разными объектами.  В итоге получается такое распределение:

Сервис

Количество прогонов

Min время за прогон

Max время за прогон

 Среднее время за прогон

Service-reactor

10

15 мс

450 мс

33 мс

Service-no-reactor

10

37 мс

1047 мс

50 мс

Service reactor

Как видно, количество запросов выросло в 6 раз, по сравнению с запросом без использования cache (рис.5).

Рис.5
Рис.5

Service no reactor

Усредненный профиль нагрузки c cache (Рис.6).

Рис.6
Рис.6

Количество запросов увеличилось в 4 раза и выросло до 17 500 тысяч запросов. Расход памяти heap по по сравнению с прогонами без использования cache снизился до 75%.  Ошибки - ReadTimeoutException.

Итого

Замеры показали, что сервис, написанный с помощью reactor дает прирост в производительности и скорости - плюс 20% от количества запросов и минус 25% к скорости обработки запросов. Это с условием, что мы используем от reactor только асинхронность. Прирост к производительности всего бизнес процесса в целом нам дала локальная оптимизация отдельного сервиса. Оптимизация каждого следующего будет давать столько же.

Когда удастся включить реактивную цепочку с использованием nio между двумя мигрированными на Reactor сервисами, будет еще более значимый прирост. Вся цепочка сервисов, которая приводит к формированию бизнес продуктов станет по настоящему реактивной, без оговорок, после замены всех участвующих в процессе микросервисов. Блокировать ее нужно будет только при формировании результата в расчетную систему. Это процесс, каждый шаг будет приносить результат. Обоснование для того, чтобы запустить поэтапный рефакторинг всей системы, готово. 

Запуск

Для запуска проекта на web flux потребуется учесть еще ряд изменений, которые потребуется сделать в проектах. Те, которые я нашел и считаю важными, ниже.

Swagger

Для документирования сервисов в проекте без реактора использовалась такая зависимость:

      <dependency>            
          <groupId>org.springdoc</groupId>            
          <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
          <version>[Возьмите последнюю стабильную версию из mvn repository]</version>
      </dependency>

Аналогичная зависимость для сервиса c reactor:

      <dependency>            
          <groupId>org.springdoc</groupId>
          <artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
          <version>[Возьмите последнюю стабильную версию из mvn repository]</version>
      </dependency>

Это все. Просто изменили тип зависимости. После этого swagger/openApi будет отображаться корректно. На это есть модульные тесты. Они полезные. Если не использовали - рекомендую.

Централизованное логирование

Полное описание нашего способа логирования я тут приводить не буду. Мы используем logback-spring.xml. В этот файл добавляются:

  • Библиотеки, работу с которыми нужно логировать;

  • Формат записи логов;

  • Сервера, куда лог отправлять;

Работа с файлом реализуется через дополнительные logback и micrometer зависимости в проекте.

Для того, чтобы добавить логирование Spring reactor компонентов мне потребовалось в файл добавить 2 строчки:

...
<logger name="reactor.netty" level="INFO" />
<logger name="org.apache.tomcat.util.net.NioSelectorPool" level="WARN"/>
...

План Б

Если решите пойти подобным путем, то рекомендую не делать рефакторинг существующего сервиса, а создавать новый. И так при каждой замене. Убрать старый Вы сможете в любой момент, но иметь альтернативное решение возможных проблем тоже будет полезно. 

Итого

Уверен, что список того, что я нашел, со временем будет только дополняться и дополняться. Если появится что-то интересное, обязательно расскажу.

Пилотирование

В продуктивной среде сервис c не реактивной на реактивную реализацию был заменен 11 февраля 2025. Приведу короткие цифры, характеризующие реализованные изменения в сервисе:

  • Pods kubernetes:

    • Ранее: 4 pods;

    • Сейчас: 1 pod;

  • Heap Java:

    • Ранее: ~ 75%;

    • Сейчас: ~ 45%;

  • Среднее время обработки запроса за неделю при нагрузке ~ 3 миллионов запросов: 

    • Ранее: ~ 60 мс;

    • Сейчас: ~ 40 мс;

Вместо завершения и ссылка на репозиторий

Проект готов. Если принесет Вам пользу или сподвигнет Вас на изменения, будет здорово.

Благодарности

Спасибо моей команде, которая помогает и делает работу интересной, спасибо Саше Никольской и Сергею Акимову, которые дают возможность проводить такие эксперименты, спасибо Диме Эрбесу, который подначил начать этот процесс и провел ревью реактивного сервиса, подсказал много интересных моментов. Спасибо Сергею Степанову, который согласился на это, с условием того, что процесс будет контролируемым.

До встречи!

Комментарии (6)


  1. ververesk
    20.02.2025 12:46

    А разве с Spring boot 3 работает playtika? Мне кажется что нет, поэтому мы и переписывали на WebFlux с плэйтики


    1. in86 Автор
      20.02.2025 12:46

      Здраствуйте,

      У реактивного сервиса актуальный pom такой - https://github.com/engine-it-in/service-reactor/blob/main/pom.xml

      spring-boot-starter-parent -> 3.4.2
      com.playtika.reactivefeign -> 4.2.1

      Может быть ранее они были не совместимы. На тех версиях, что я указал все работает


      1. ververesk
        20.02.2025 12:46

        как мне теперь жить с этим знанием, зря только переписывали)))


        1. in86 Автор
          20.02.2025 12:46

          Ну может для Ваших версий было и не зря )
          Как минимум навык наработали )
          Плюс в карму для опыты )


  1. romych2004
    20.02.2025 12:46

    Выглядит что вы реактив или не поняли до конца, или конкретно в этом моменте неправильно используете.. Аннотация @Cacheable вам закеширует объект Mono/Flux, но не результат выполнения этого паблишера. Т.е. сам Моно вы закешировали, а вот за данными всё равно каждый вызов ходите. Ну или может что-то изменилось и у меня устаревшее понимание работы аннотации :)


    1. in86 Автор
      20.02.2025 12:46

      Реальные замеры с/без Кеша, замеры количества вызовов сторонней системы и юнит тесты подтверждают, что аннотация работает так, как изложено в статье. Но я перепроверю. Если будут расхождения, то вернусь с правками )