Виктор Васнецов, Рыцарь на распутье; fatcatart.com


Привет, Хабр! Здесь краткий пересказ интересной баги c GitHub. Для воспроизведения см. проект spring-flux-callstack.


Не так давно я заметил, что при ошибках приложения, стектрейс иногда довольно длинный. И в нем повторялось по многу раз один и тот же набор строк (сам стектрейс под катом):


at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)

Как вы уже поняли, это методы из Project Reactor, который обеспечивает асинхронную работу для Router Function в WebFlux.


Налицо неэффективность, ведь каждый такой блок кода порождает несколько объектов в куче, а таких блоков много. И создаюся они на каждый вызов.


Смутивший меня стектрейс
at org.springframework.web.reactive.function.server.CoRouterFunctionDsl$asHandlerFunction$1$1.invokeSuspend(CoRouterFunctionDsl.kt:599)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:313)
    at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
    at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:109)
    at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:158)
    at kotlinx.coroutines.reactor.MonoKt$monoInternal$1.accept(Mono.kt:55)
    at kotlinx.coroutines.reactor.MonoKt$monoInternal$1.accept(Mono.kt)
    at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2267)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2075)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1949)
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211)
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161)
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
    at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64)
    at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:228)
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:465)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:170)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
    at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:387)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

Router Function


Для начала, что такое Router Function. Для регистрации асинхронного API, начиная с Spring Framework v5, можно использовать следующий подход:


@Bean
open fun httpEndpoints(): RouterFunction<ServerResponse> {
    return coRouter {
        GET("/api/users") {
            ServerResponse
                    .ok()
                    .bodyValueAndAwait("Ok !!!!")
        }
        GET("/api/developers") {
            ServerResponse
                    .badRequest()
                    .bodyValueAndAwait("Not ok (((")
        }
    }
}

Метод GET принимает на вход асинхронную функцию suspend (ServerRequest) -> ServerResponse, в которой можно по ServerRequest создать ServerResponse. Всё просто и логично. И если в вашем приложении есть набор сервисов, которые работают по более-менее одинаковой схеме, то вы можете их свести к следующей модели (казалось бы):


@Bean
open fun httpEndpoints(): RouterFunction<ServerResponse> {
    // сразу скажу - так делать не надо
    val urlToMethod: Map<String, RequestProcessor> = createRequestProcessors()
    return coRouter {
        urlToMethod.forEach { url, processor -> // forEach здесь не уместен
            GET("/$url") {
                processor.createResponse(it)
            }
        }
    }
}

В итоге, если у вас большинство API методов в приложении берут на вход примерно одно и то же, то можно выделить интерфейс и сделать один обработчик (как описано выше). Получается явный код, без магии аннотаций и пр.


Как работает Router Function?


Интерфейс RouterFunction здесь, однако на момент написания статьи он выглядел так:


@FunctionalInterface
public interface RouterFunction<T extends ServerResponse> {
    Mono<HandlerFunction<T>> route(ServerRequest request);

    default RouterFunction<T> and(RouterFunction<T> other) {
        return new RouterFunctions.SameComposedRouterFunction<>(this, other);
    }

    default RouterFunction<?> andOther(RouterFunction<?> other) {
        return new RouterFunctions.DifferentComposedRouterFunction(this, other);
    }

    default RouterFunction<T> andRoute(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
        return and(RouterFunctions.route(predicate, handlerFunction));
    }

    default RouterFunction<T> andNest(RequestPredicate predicate, RouterFunction<T> routerFunction) {
        return and(RouterFunctions.nest(predicate, routerFunction));
    }

    default <S extends ServerResponse> RouterFunction<S> filter(HandlerFilterFunction<T, S> filterFunction) {
        return new RouterFunctions.FilteredRouterFunction<>(this, filterFunction);
    }

    default void accept(RouterFunctions.Visitor visitor) {
        visitor.unknown(this);
    }
}

Как видно, он позволяет асинхронно и каскадно находить правильный обработчик для входного url. Условно, у функции есть три варианта:


  1. Обработать запрос
  2. Сделегировать запрос вложенной функции (и откусив префикс)
  3. Отвергнуть запрос, вернув Mono.empty(), что означает "за этот url я не отвечаю".

Как результат, есть абстрактый интерфейс, к которому можно добавлять новые и новые реализации. Можно даже сделать прокси-сервер, который в каждый момент времени не знает, корректен ли URL (т.е. он может спросить о корректности у одного из других серверов). В итоге, мы получаем расширяемую систему, в которой есть уже готовая реализация (с методами GET, как выше), однако никто не мешает сделать свою (с рулем, седлом, педалями и т.д.).


А теперь вернемся к примеру выше:


@Bean
open fun httpEndpoints(): RouterFunction<ServerResponse> {
    // сразу скажу - так делать не надо
    val urlToMethod: Map<String, RequestProcessor> = createRequestProcessors()
    return coRouter {
        urlToMethod.forEach { url, processor -> // forEach здесь не уместен
            GET("/$url") {
                processor.createResponse(it)
            }
        }
    }
}

Как я думаю, вы уже поняли, что на деле каждый вызов метода GET создает новую RouterFunction. И каждая из них проверяет, что URL совпал (или не совпал), а потом переключается дальше. Если на вход придет неподдерживаемый адрес, то каждая функция его проверит. То есть, для адреса размера N, сложность поиска рабочей функции будет O(N*M*K), где M — средняя длина адреса в функции, а K — число RouterFunction (говоря другими словами, если удвоить число маршрутов сайта, то время поиска правильного увеличится вдвое, что странно, так как тот же switch в Java не обладает таким свойством). Хуже всего то, что на каждой итерации создается еще несколько объектов в куче (для асинхронности), что вряд ли дает ускорение всей системе.


Казалось бы, если все адреса статические (или, по-другому, сервер заранее знает все поддерживаемые), то можно использовать префиксное дерево. Этот алгоритм зачастую используется, например, для поиска на web страницах. Его минус в том, что надо заранее знать все возможные варианты, однако далее он работает со сложностью O(min(M, N)), где N — это длина адреса, M — это или длина совпавшего префикса, или максимальная длина префикса.


Как с этим бороться?


Вариант 1: использовать pattern. Например, если все поддерживаемые адреса можно выразить строкой /[static-prefix]/[suffix], где static-prefix для всех один, то можно выразить запрос так:


GET("/static-prefix/{suffix}"), 
      req -> ok().body(
          execute(
              req.pathVariable("suffix")
          )
        )

Полный пример можно посмотреть здесь. Если устроить switch по суфиксу, то решение будет работать за линейное время от запроса (напомню, что в Java/Kotlin/Scala switch по строке высчитывает сначала хеш, и только потом делает сравнение символов, чтобы избежать коллизий).


Вариант 2: использовать иерархию в функциях. Как пример кода отсюда:


public RouterFunction<ServerResponse> productSearch(ProductService ps) {
    return route().nest(
        RequestPredicates.path("/product"), 
        builder -> { // мы можем добавить сколько угодно функций сюда
                builder.GET("/name/{name}", req -> ok().body(ps.findByName(req.pathVariable("name"))));
            }
        ).build();
}

...


И в заключении: если у вашего приложения нет более-менее серьезной нагрузки, то подобные оптимизации бессмысленны. Сборщик мусора стерпит, а заодно код получается простым для чтения. Однако если ваш сайт постоянно грузят пользователи, то можно ускорить сервис путем уменьшения число роутеров.


Как видно, из-за абстранций и идеи расширяемости, не стоит полагаться на то, что команда Spring всё волшебным образом ускорит. Лучше просто упростить конфигурацию.