В предыдущей заметке было подробно рассмотрено одно из решений для обеспечения повсеместной доступности текущего обрабатываемого запроса. В отличие от неё, нынешняя заметка не будет посвящена глубокому рассмотрению одной проблемы, а поверхностно пробежится по парочке разных.

Унификация @Around-аспектов

Ещё одна любопытная задачка, возникающая при внедрении реактивного стека в сервлетные приложения, – это поддержка AOP-аспектов, оборачивающих тело целевого метода. Такие аспекты декларируются с аннотацией org.aspectj.lang.annotation.Around и имеют возможность повлиять как на аргументы перехватываемого метода, так и на его результат, вплоть до полной его замены. Например, вот так может выглядеть примитивный аспект, логирующий аргументы и результат любого метода, помеченного аннотацией @GetMapping:

@Aspect
@Component
public class LoggingAspect {
  private static final Logger log = LoggerFactory.getLogger(LoggingAspect.class);

  @Around("within(pro.toparvion.sample.gateway.gatewaydemo..*) " +    // [1]
      " && execution(@org.springframework.web.bind.annotation.GetMapping * *.*(..))") // [2]
  public Object logArgsAndResult(ProceedingJoinPoint pjp) throws Throwable {
    Object[] args = pjp.getArgs();             // [3]
    String targetMethodName = pjp.getSignature().getName();
    log.debug("Метод '{}' вызван с аргументами: {}", targetMethodName, args);
    Object response = pjp.proceed(args);       // [4]
    log.debug("Метод '{}' ответил: {}", targetMethodName, response);
    return response;
  }
}

1️⃣ Ограничиваем область действия аспекта, чтобы не напороться.
2️⃣ Задаём шаблон методов, на которые должен действовать аспект.
3️⃣ Получаем аргументы и имя целевого метода из точки вкрапления.
4️⃣ После логирования аргументов делегируем управление целевому методу, чтобы потом сразу залогировать его результат. Для простоты аспект не учитывает случай, когда метод завершается исключением.

Проблема

Проблема здесь, в сущности, та же, что и с фильтром для поддержки MDC: из-за «двухэтапного» исполнения реактивного конвейера аспект захватит лишь результат декларации конвейера, но не реально курсирующие по нему данные:

16:24:18.136 DEBUG [ctor-http-nio-3] LoggingAspect : Метод 'proxy' ответил: MonoPeekTerminal

, т.е. вместо действительного результата в логах всегда будет числиться MonoPeekTerminal (для данного примера).

Решение

Решением может стать такое же (как с MDC) разделение аспекта на два: для сервлетного и реактивного режимов (при помощи аннотации @ConditionalOnWebApplication). Однако, в отличие от случая с MDC, здесь мы не привязаны к API веб-фильтров, а значит, можем переписать аспект так, чтобы он один умел работать в обоих режимах. Это может быть особенно ценным, если логика аспекта достаточно сложна и/или объёмна, чтобы сохранить её как можно более DRY.

Чтобы это сделать, нужно:

  1. Уметь отличать реактивные методы от императивных;

  2. Уметь назначать желаемое поведение отложенно, т.е. не в момент применения аспекта, а позже, когда по конвейеру будут проходить данные.

Первое достаточно легко сделать анализом типа результата метода: если он представляет собой реактивный «хвостик» в виде Mono<?> или Flux<?>, то и весь метод можно считать реактивным. Второе решается использованием соответствующих реактивных операторов: в рассматриваемом простом случае для Mono это будет метод doOnSuccess(), а для FluxdoOnNext(). Кроме того, чтобы не повторять само логирование ответа в каждом месте, вызов log.debug(...) стóит оформить в метод logResult(). Тогда основной метод аспекта (точнее, его окончание) преобразится вот таким образом:

  @Around("within(pro.toparvion.sample.gateway.gatewaydemo..*) " +
      " && execution(@org.springframework.web.bind.annotation.GetMapping * *.*(..))")
  public Object logArgsAndResult(ProceedingJoinPoint pjp) throws Throwable {
    Object[] args = pjp.getArgs();
    String targetMethodName = pjp.getSignature().getName();
    log.debug("Метод '{}' вызван с аргументами: {}", targetMethodName, args);
    Object resultTail = pjp.proceed(args);
    return assignResultLoggingBehavior(targetMethodName, resultTail);   // [1]
  }

1️⃣ Теперь здесь нет явного логирования результата. Вместо него вызывается некое назначение (декорирование) ответа, которое на самом деле может сводиться и к непосредственному логированию. Это видно в коде нового метода assignResultLoggingBehavior:

  private Object assignResultLoggingBehavior(String targetMethodName, Object tail) {
    if (tail instanceof Mono<?> monoResult) {         // [1]
      tail = monoResult.doOnSuccess(result -> logResult(targetMethodName, result));
    } else if (tail instanceof Flux<?> fluxResult) {  // [2]
      tail = fluxResult.doOnNext(result -> logResult(targetMethodName, result));
    } else {
      logResult(targetMethodName, tail);              // [3]
    }
    return tail;                                      // [4]
  }

1️⃣ Используем Pattern Matching (JDK 16+) для сокращения записи.
2️⃣ В случае с HTTP-вызовом результат в виде Flux соответствует стриминговому ответу сервера, например, SSE (просто FYI).
3️⃣ Если результат не реактивный, просто логируем результат сразу.
4️⃣ Возвращаем результат, чтобы на него можно было подписаться либо сразу вернуть клиенту.

В таком виде аспект сможет воевать на два фронта: например, будучи оформленным в инфраструктурную библиотеку, он может использоваться как с привычными сервлетными приложениями, так и с новомодными реактивными. Однако важно помнить, что в import'ах у него есть классы из Project Reactor, а значит, при использовании этого класса в runtimeClasspath должна присутствовать библиотека io.projectreactor:reactor-core, даже если аспект применяется к сервлетному приложению.

Разумеется, такой аспект далёк от состояния production-ready, хотя бы потому, что:

  • не учитывает, что аргументы тоже могут быть реактивными;

  • не обрабатывает исключения, летящие из жерла целевого метода;

  • декларирован как @Around, хотя далеко не всегда требуется именно двусторонняя обёртка (часто достаточно обойтись аннотациями @Before или @After).

Однако в качестве отправной точки его должно быть достаточно.

Полный код аспекта можно найти в прилагаемом демо-проекте.

Попутное резюме

Эта часть заметки была посвящена одному из самых «магических» механизмов вкрапления в поведение приложений. Теперь и он сможет работать на любом из двух стеков. В следующей части речь пойдёт о гораздо более явном инструменте – декларативных HTTP-клиентах.


Ремонт OpenFeign-клиентов

В отличие от предыдущих пунктов, рассматриваемые здесь две проблемы нельзя назвать широко распространёнными. Однако тем «счастливчикам», кто всё же сталкивается с ними, от этого не легче.

Проблема 1

Она констатирована прямо в документации на Spring Cloud OpenFeign:

As the OpenFeign project does not currently support reactive clients, such as Spring WebClient, neither does Spring Cloud OpenFeign.

Другими словами, если в каком-то проекте используется высокоуровневый декларативный HTTP-клиент на основе библиотеки OpenFeign, то под его капотом не сможет работать соответствующий низкоуровневый реактивный HTTP-клиент. Это относится к Spring Cloud OpenFeign версии 3.0.5 и остаётся актуальным, как минимум, на 02.02.2022.

Сразу за этой констатацией идёт многообещающее:

We will add support for it here as soon as it becomes available in the core project.

, и в самом OpenFeign об этом действительно много разговоров, однако воз и ныне там. А пока всё так, разработчики Spring Cloud предлагают:

Until that is done, we recommend using feign-reactive for Spring WebClient support.

Но этот путь годится только новым проектам, ведь он требует переработки интерфейсов декларативных клиентов – их результаты должны получить обёртки в виде Mono или Flux. А в текущей задаче мы рассматриваем вариант только с сохранением императивного стиля.

Чем чревата эта проблема? Тем, что в реактивном приложении ненароком вызванный OpenFeign-клиент может заблокировать какой-либо из потоков-обработчиков в Reactor’е и этим парализовать его частично или даже полностью. Коварство проблемы в том, что, пока потоков-обработчиков достаточно много, а ответы OpenFeign-клиентам приходят достаточно быстро, деградация, скорее всего, останется незамеченной. По законам жанра это будет происходить в тестовом окружении. И только под серьёзной нагрузкой (например, на production, почему бы нет) она начнёт периодически постреливать то тут, то там, или вовсе поставит всё колом.

Решение 1

Если потенциальное место вызова OpenFeign-клиента известно, то его нужно локализовать относительно реактивного конвейера и приправить вызов оператором .subscribeOn(Schedulers.boundedElastic()) или .publishOn(Schedulers.boundedElastic()). Это позволит Reactor’у аллоцировать под вызов отдельный поток, который будет не жалко на какое-то время заблокировать.

Если же такое место не известно или есть опасение, что оно не одно, то стоит потестировать приложение под наблюдением BlockHound – специального Java-агента, умеющего детектировать блокирующие вызовы из неблокирующих потоков. Он должен помочь выявить проблемные вызовы и либо обернуть их в эластичный бинт пул, либо исключить.

Проблема 2

Куда «веселее» становится, когда в дело вступает балансировщик нагрузки spring-cloud-loadbalancer, который для OpenFeign-клиентов поставляет класс:

org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient

Как не трудно догадаться по его имени, с реактивщиной этот класс дружит плоховато. Из-за этого получается, что с переходом на реактивный стек некогда безупречные OpenFeign-клиенты уже в runtime начинают падать на каждом запросе с неприглядным

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.11.jar:3.4.11]

Так происходит потому, что в методе choose класса BlockingLoadBalancerClient (для балансировщика версии 3.0.4) есть вот такая строчка:

Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();

Своим вызовом block() она раздражает в методе reactor.core.publisher.BlockingSingleSubscriber#blockingGet() проверку:

if (Schedulers.isInNonBlockingThread()) {
  throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
}

, которая и роняет выполнение запроса. Любопытно, что под вызовом isInNonBlockingThread() скрывается не какая-нибудь реактивная магия, а всего лишь проверка на Thread.currentThread() instanceof NonBlocking (наличие специального маркерного интерфейса).

Выброс такого исключения – сознательная мера Project Reactor’а, чтобы выявлять и предотвращать выполнение блокирующего кода в потоках-обработчиках, коих всегда немного (как правило, по числу ядер процессора) и которые должны освобождаться максимально быстро (чтобы подхватить другие задачи). В этом заключается часть смысла слова «неблокирующий» в дефиниции любого реактивного фреймворка. И да, это тоже проявление fail-fast, благодаря чему проблемные места можно замечать без применения спецсредств наподобие BlockHound.

Решение 2

Но что делать с этим нам, прикладным разработчикам? К счастью, в качестве основного (правильного) решения здесь подходит такое же, как для проблемы 1 – оформлять подобные вызовы в пул потоков, допускающих блокировку, например, Schedulers.boundedElastic(). Пример прилагается.

Если же оно по каким-либо причинам не подходит, придётся вбивать костыли искать обходные пути, например, переопределять бин blockingLoadBalancerClient своей собственной реализацией, не порождающей явной блокировки. Один пример такого бина приведён в прилагаемом проекте, но его точно не надо использовать в production без доработки, это намеренно примитивный вариант. Если прочтение этих строк отзывается в голове другими вариантами решения, милости просим в комментарии.


Общее заключение

В этой пространной серии заметок мы рассмотрели несколько задач, с которыми может столкнуться прикладной разработчик при попытках подружить императивный код на сервлетном фреймворке Spring WebMVC с кодом на реактивном фреймворке Spring WebFlux:

  • Как разрулить зависимости и составить правильный classpath?

  • Как поддержать вывод MDC-меток в логах на реактивном стеке?

  • Как сохранить доступность текущего запроса из любого места?

  • Как сделать аспекты-обёртки терпимыми к реактивному стеку?

  • Как сохранить работоспособность HTTP-клиентов на OpenFeign?

Ни весь этот список целиком, ни отдельные его пункты не претендуют на полноту раскрытия темы, поскольку класс таких задач весьма обширен. Однако приведённый материал должен помочь читателю ухватить основные идеи и принципы, чтобы решать подобные задачи гораздо быстрее, чем довелось автору этих строк.

Насколько это (не)удалось, можно и нужно рассказывать в комментариях под этим текстом. Там же приветствуются ссылки на схожие источники информации по этой теме.

Дабы не ранить тонкие чувства читателя, все вкрапления русского текста в приведённых примерах переведены на английский и выложены в виде цельного проекта на GitHub.


Комментарии (3)


  1. SimSonic
    08.02.2022 18:07

    Прочитал, спасибо!

    Главный вопрос — стоит ли ввязываться в это и побеждать подводные камни, или через N условных лет библиотеки намного лучше будут приспособлены к наличию реактивности в проекте? Речь не про изучение нового ради опыта, а практически — выиграл ли ваш проект?

    Мы на нашем проекте рассматриваем такой путь: перед текущим сервлетным монолитом ставим spring cloud api gateway, затем по очереди вырезаем функциональные домены в отдельные микросервисы, попутно решая, кому их них быть реактивным, а кому опять сервлетным. Для внешних систем останемся тем же чёрным ящиком.


    1. Toparvion Автор
      09.02.2022 05:54
      +2

      через N условных лет библиотеки намного лучше будут приспособлены к наличию реактивности в проекте?

      С одной стороны, можно с уверенностью, что библиотеки совершенно точно будут (и уже давно начали) поддерживать реактивный подход: тот же Spring в каком-то смысле задал этот тренд, адаптировав свои фреймворки WebMVC (в виде WebFlux), Security и частично Integration; другие тоже на подхвате, например, feign-reactive и различные поставщики драйверов к БД. С другой стороны, их усилия, как правило, не нацелены именно на "скрещивание" сервлетного (а в более общем случае - императивного) подхода с реактивным; вместо этого они предлагаю самостоятельную реактивную альтернативу своим же собственным разработкам. Поэтому именно ждать N условных лет не стоит.

      Однако это не значит, что ответ на вопрос:

      стоит ли ввязываться в это и побеждать подводные камни

      звучит как однозначное "да", потому что этот ответ сильно зависит от того, можете ли вы позволить себе не ввязываться и не побеждать, т.е. насколько сильно ваш проект/продукт связан с сервлетным стеком. В моём случе это проникновение было очень сильным, поэтому и пришлось идти таким тернистым путём. Этот путь был пройден совсем недавно, я вижу и верю в его позитивные результаты, но объявлять их окончательными пока не тороплюсь, нужно пожить с этим подольше (это ответ на вопрос "выиграл ли ваш проект?").

      Что касается более общего случая, то ваш подход с выделением микросервисов на различных стеках кажется вполне удачным; мне известен ещё один такой пример продукта, и он весьма успешен. Конечно, за такой подход приходится платить двойственностью решений одних и тех же задач (то же логирование, например). Это большой минус. Но если в команде есть понимание и объективные оценки профита, то этот минус может быть полностью оправдан.


  1. Sigest
    10.02.2022 07:05
    +1

    Спасибо за то, что поделились своим опытом. Все 3 статьи очень полезны, по крайней мере для меня