Работа с библиотекой WebFlux вызывает затруднения у многих Java/Kotlin разработчиков
Код выглядит непривычно, непонятно в каких потоках выполняется логика и как этим управлять
Сегодня я попытаюсь объяснить простым языком такой важный аспект, как момент переключения потоков в WebFlux, это поможет вам допускать меньше ошибок и писать более производительный код
Перед тем как начать, подписывайтесь на мой блог в телеграм, где вы сможете найти другие материалы от меня
Стоимость потоков в Java
Создание большого количества потоков и работа с ними в Java имеет определенные накладные расходы:
Количество потоков ограничивается возможностями системы, поскольку потоки Java соответствуют системным потокам, без специальных настроек вы сможете создать максимум несколько тысяч потоков
Создание и завершение потока требует времени и ресурсов
Если количество потоков слишком велико, процессор начинает тратить больше времени на переключение контекста между ними в точках синхронизации (synchronized), оспаривание замка и перевод потоков в состояние ожидания. Даже atomic классы под высокой нагрузкой становятся неэффективными
Затраты по памяти. Это наиболее спорный пункт, поэтому поместил его в конец. Фактически потоки занимают мало памяти, т.е. вы вполне сможете создать несколько тысяч потоков при ограничении по памяти в 100 МБ на приложение, хотя в других материалах утверждается что каждый поток занимает 1 МБ. Однако даже несколько тысяч потоков это значительно ниже того что предлагает нам реактивный подход
Все это может приводить к быстрой деградации системы под возрастающей нагрузкой
Для чего нужен WebFlux
WebFlux - это реализация реактивного манифеста силами команды Spring в Java
Как правило, в коммерческой разработке мы создаем веб-сервера, которые при классическом WebMVC подходе работают по принципу - один поток на каждый запрос
При использовании WebFlux ваш веб-сервер будет запускаться на технологии servlet 3.1, которая позволяет обрабатывать входящие запросы малым количеством потоков, как правило их число равно количеству ядер процессора, такая технология называется Event Loop
При этом в качестве контейнера сервлетов по умолчанию выступает Netty, но вы также можете использовать другие, в том числе Tomcat
Тем самым если на вашем сервере 8 ядер, то всего 8 потоков будут обслуживать все запросы клиентов, количество которых может исчисляться тысячами в секунду. В связи с чем нельзя допускать простоя потоков Event Loop'а, ведь если они будут заняты, то не смогут обрабатывать новые входящие запросы
Простаивать потоки могут по двум причинам:
ожидание блокирующей операции - операции ввода/вывода, работа с бд через jdbc, вызов RestTemplate и другие
выполнение долгих вычислений - это может быть любая бизнес логика, которая занимает значительное время процессора, тем самым поток занят полезной работой. но у него не остается времени на обработку входящих запросов клиентов
В обоих случаях, чтобы избежать простоя, нужно выполнить переключение потока на выделенный планировщик, давайте разбираться как это происходит
Понятия
В качестве примера приведу небольшой кусочек кода, который мы будем использовать далее:
Mono.just(2)
.map(it -> it + 1)
.filter(it -> it >= 3)
.subscribe();
Данную структуру кода я буду называть реактивной цепочкой
Значение
2
, переданное оператору just - буду называть элементомMono - издатель, т.к. он отвечает за передачу элементов в обработку (также издателем является Flux)
just / map / filter - операторы реактивной цепочки
.subscribe() - создает дефолтного подписчика, тем самым запуская реактивную цепочку
Также далее нам встретится Scheduler - это планировщик, в одном из потоков которого будет выполняться логика операторов, по сути это знакомый нам ExecutorService, только адаптированный под работу с WebFlux
Этих понятий нам будет достаточно для дальнейшего разбора
Как происходит создание реактивной цепочки
Это крайне важная часть статьи для понимания момента переключения потоков
Порядок запуска реактивной цепочки состоит из трех этапов:
1 - сборка цепочки (assemble) - сверху-вниз
2 - подписка (subscription) - снизу-вверх
3 - обработка элементов - сверху-вниз
Еще раз обратим внимание на кусок кода, который я приводил выше:
public void runExampleMono() {
Mono.just(2)
.map(it -> it + 1)
.filter(it -> it >= 3)
.subscribe();
}
Разберем по порядку что будет происходить при вызове метода runExampleMono:
Этап сборки
Вызов каждого оператора (just / map / filter) возвращает свой объект Mono, который затем передается нижестоящему оператору в его конструктор
Сначала будет создан объект MonoJust, т.к. он находится на самом верху, далее он будет передан в конструктор объекта MonoMap, тот в свою очередь будет передан в MonoFilter
Схематично это выглядит так:
new MonoFilter(
new MonoMap(
new MonoJust()
)
);
Этап подписки
Когда сборка дойдет до подписчика (оператора .subscribe()) запустится этап подписки
Т.к. каждый нижестоящий оператор хранит в себе объект вышестоящего, он может к нему обратиться и сделает это для того чтобы подписаться на него, вызвав метод subscribe, при этом будет передан объект подписки Subscription, который также создается в каждом операторе вкладывая в конструктор предыдущий объект Subscription:
monoFilter.subscribe(Subscriber) {
monoMap.subscribe(new FilterSubscriber(Subscriber)) {
monoJust.subscribe(new MapSubscriber(FilterSubscriber(Subscriber))) {
...
}
}
}
Далее происходят еще два этапа, постараюсь описать их кратко:
- когда вызовы методов subscribe дойдут до издателя (в данном случае MonoJust), издатель отправит сигнал onSubscribe подписчику, путем вызова метода onSubscribe сверху-вниз по реактивной цепочке, сообщая подписчику, что подписка выполнена успешно, при этом передаст объект Subscription со всеми вложенными подписками
- получив сигнал onSubscribe подписчик начнет запрашивать элементы у издателя, путем вызова метода request у объектов Subscription снизу-вверх
По умолчанию при подписке запрашивается Long.MAX_VALUE число элементов, то есть неограниченное количество
Обработка элементов
Когда вызовется метод request у подписки издателя (в нашем случае MonoJust имеет специальный объект подписки под названием ScalarSubscription), то элементы начнут передаваться вниз по цепочке, путем вызова метода onNext
Сигнал onNext вызывается по мере обработки элементов каждым из операторов, элементы начинают передаваться сверху вниз, и к ним применяется логика, описанная в операторах
Когда переключается поток в реактивной цепочке
Общее правило гласит, что нам не гарантируется в каком потоке будет выполняться каждый из реактивных операторов, но это не совсем так, мы вполне можем управлять тем в каком потоке будут выполняться те или иные части реактивной цепочки
Переключение происходит при следующих вызовах:
-
Вызов одного из операторов:
.publishOn(<Scheduler>)
.subscribeOn(<Scheduler>)
.parallel().runOn(<Scheduler>)
Переключение на издателя, который выполняется в другом потоке, отдельно коснемся вызова webClient'а
Рассмотрим подробнее каждый их вариантов
.publishOn(<Scheduler>)
Данный оператор работает на этапе обработки элементов, когда поступает следующий элемент, publishOn берет один из потоков планировщика и передает дальнейшую обработку цепочки в этот поток
Таким образом, операторы следующие дальше по цепочке будут выполнять свой код в выбранном потоке планировщика, и следующий элемент не будет взят в обработку, пока текущий элемент не будет обработан всеми нижеследующими операторами, либо не переключится поток
Т.е. параллельная обработка элементов выполняться не будет, несмотря на то какой Scheduler мы будем использовать
Также publishOn буфферизирует передаваемые элементы, поскольку внутри содержит очередь элементов на обработку. Это означает, что если элементы будут поступать быстрее в publishOn чем обрабатываться нижней частью цепочки, они будут накапливаться в очереди оператора. Глубина данной очередь по умолчанию равна 256, ее можно изменить путем указания параметра prefetch, в качестве второго аргумента .publishOn(<Scheduler>, <prefetch>)
.subscribeOn(<Scheduler>)
Данный оператор срабатывает на этапе подписки, т.е. когда при создании цепочки мы поднимаемся снизу вверх и еще не успели передать ни одного элемента в обработку
Поэтому subscribeOn может быть помещен в любом месте реактивной цепочки, и обработка элементов начнется в потоке его планировщика
Если в реактивной цепочке будет несколько операторов subscribeOn, то обработка элементов начнется в планировщике того оператора, который находится выше по цепочке
parallel().runOn(<Scheduler>)
Данный оператор в отличие от предыдущих работает только с Flux издателем, поскольку он не просто переключает поток, а передает обработку одновременно нескольким потокам планировщика
По своим свойствам он похож на .publishOn(), т.к. переключает поток нижеследующих операторов
Количество потоков, в которых будет выполняться логика последующих операторов, по умолчанию равно количеству ядер процессора, и их можно регулировать, передав значение parallelism в аргументе: .parallel(<parallelism>)
При этом стоит учитывать, что в планировщике количество потоков должно быть таким же или превышающим, значение переданное в вышеуказанном аргументе:Schedulers.newParallel("my-parallel-scheduler", <количство потоков>)
Вызов WebClient
При выполнении запроса с помощью webClient, после вызова оператора retrieve() или exchangeToMono()/ exchangeToFlux() (т.е. при получении ответа), выполнение последующих операторов будет происходить в одном из потоков Netty
Это будут те же самые потоки Netty, которые принимают запросы клиентов, поэтому надо быть крайне внимательным и стараться переключать поток сразу после выполнения webClient запросов, чтобы исключить ситуации при которых потоки Netty будут простаивать и замедлять работу сервера в целом
Переключение на издателя, который выполняется в другом потоке
Предыдущий пример с webClient'ом по сути демонстрирует переключение на другого издателя, т.е. мы при помощи оператора .then() переключились на издателя Mono, который возвращается webClient'ом и выполняется в другом потоке
Также переключиться на другого издателя возможно при помощи оператора .flatMap(), и дальнейшее выполнение цепочки продолжится в потоке издателя, на который мы переключились
По умолчанию, при переключении издателя, реактивная цепочка будет выполняться в том же потоке что и вызывающий метод, т.к. никакого переключения потоков здесь нет:
Однако если внутри издателя, на которого мы переключились, обработка элементов происходит в другом потоке, то и дальнейшее выполнение цепочки продолжится в новом потоке:
Также следует упомянуть вызов методов r2dbc репозитория, где происходит переключение на пул потоков бд
Нетривиальные ситуации
Приведу пару ситуаций, с которыми мне довелось столкнутся, однако уверен что на практике их может быть намного больше, приводите свои примеры в комментариях
Объединение нескольких издателей с помощью zip / concat
Что случится если мы объединим несколько издателей с помощью оператора zip (в случае с Mono) или concat (в случае с Flux), при этом каждый издатель будет исполняться в своем потоке, какой поток будет использован для обработки дальнейшей цепочки ?
При воспроизведении данного кода оказывается, что элементы обрабатываются дальше по цепочке каждый в потоке своего планировщика
Количество элементов превышает prefetch
prefetch обозначает количество элементов, которые будут запрошены у издателя за один раз, по окончании их обработки будет запрошена следующая группа элементов в количестве prefetch
С данным параметром есть много тонкостей, в них погружаться не будем
По умолчанию данный параметр равен 256, но мы можем сами задавать данное значение в различных операторах, в т.ч. .publishOn()
, .parallel()
и .runOn()
Рассмотрим пример кода:
Вывод в консоль:
[main] INFO -- До publishOn: 1
[my-single-scheduler-1] INFO -- После publishOn: 1
[my-single-scheduler-1] INFO -- До publishOn: 2
[my-single-scheduler-1] INFO -- После publishOn: 2
Мы видим в 3 строке, что элемент 2
, был взять из издателя потоком my-single-scheduler-1, и первый оператор doOnNext был выполнен в потоке singleScheduler, это логично, поскольку в момент запроса следующей пачки элементов мы уже отпустили вызывающий поток, и у нас остался только поток планировщика, который и будет обращаться к вышестоящим операторам для получения элементов
Итог
Сегодня мы рассмотрели случаи когда переключаются потоки в реактивной цепочке WebFlux
Как видим этим процессом можно и нужно управлять, особенно если у вас в проекте применяются блокирующие вызовы
Надеюсь теперь вы будете чувствовать себя более уверенно в работе с данной библиотекой
maxzh83
В java 21 появились виртуальные потоки и все описанное тут становится не совсем актуально. Как и использование реактивщины только для того, чтобы получить неблокирующее API.