Привет! Меня зовут Татьяна Руфанова. Сегодня мы будем понимать и принимать Reactive (Реактив). В статье расскажу, почему мы выбрали Реактив в мидл слое мобильного приложения Альфа-Банка (а у нас 300 микросервисов и треть из них реактивные), разберём, почему «нелогичное» поведение реактивной программы на самом деле очень логичное, какие сложности реактивный подход принесёт в процессе написания и исполнения кода, и как с этим бороться. А чтобы не траблшутить в проде, будем ломать тесты на Project Reactor.
Налейте чаю, включите звуки природы и настройтесь пройти все стадии принятия Reactive.
Немного о себе: работаю в Альфа-Банке уже 5 лет, участвую в продуктовой и внутренней разработке, 3 года мы с командой пишем реактивный код.
Почему мы используем Реактив в Альфа-Банке
Нагруженный микросервис до переписывания…
На картинке реальный мониторинг одного из нагруженных микросервисов Альфа-Банка в его «дореактивный» период. Приложение запущено в 8 инстансах, чтобы суммарно держать 750 запросов в секунду.
При этом на сервисах бэка, к которым обращается наш микросервис, могут происходить тайм-ауты, что сразу сказывается на времени ответа.
Помимо времени ответа, таймауты бэкэнда влияют на «здоровье» микросервиса — он обрастает потоками, и в какой-то момент начинает перезапускаться.
Почему так происходит?
У нас обычное приложение с бизнес-логикой: собери данные из нескольких сервисов, сходи в базу, кэш, соедини всё вместе и отдай наружу — никаких сложных вычислений. Зато много клиентских запросов, каждый из них порождает параллельные вызовы во внешние источники данных, а они, в свою очередь, могут долго отвечать.
Что делать, чтобы выдерживать пиковые нагрузки?
Увеличивать количество потоков в пулах севера Tomcat.
Увеличивать количество потоков в пулах внешних вызовов Hystrix.
Увеличивать память и ЦПУ самому приложению (потокам нужны ресурсы).
Увеличивать количество инстансов.
Или…
…переписать всё на Реактив! Ведь в нашем бизнес-приложении потоки большую часть времени заблокированы вводом/выводом и ждут, когда кто-нибудь пришлёт им данные по сети — потребляют ресурсы и ничего не делают!
…И после переписывания на Реактив
Мониторинги того же приложения, но уже в реактивном исполнении.
В 2,5 раза больше запросов в секунду (в пике почти 2000) на меньшее количество инстансов, всего 5.
Таймауты внешних вызовов по-прежнему случаются.
Но при этом количество потоков стабильно небольшое: 60 против 350.
Почему так?
Потому что микросервис неблокирующий, теперь он на Webflux и Project Reactor, которые хорошо работают с IO. Точнее с NIO — неблокирующим вводом-выводом.
В отличие от Tomcat, где под каждый блокирующий вызов создаётся отдельный поток, в Webflux другая модель:
небольшое количество потоков бегает в цикле событий ввода вывода (event loop);
и как только для какого-то ранее приостановленного реактивного стрима приходит событие о том, что ввод или вывод окончен (то есть мы дождались или отправили данные)...
…поток продолжает выполнять логику этого стрима.
Получается, что в нашем приложении для одного клиента выполняем бизнес-логику, пока для другого ждём ответа извне. И всё это в одном и том же потоке.
Ещё раз подчеркну, что в нашем случае отсутствуют CPU Intensive задачи.
Если бы они были, у потока бы не было «свободного времени», он бы всё время занимался вычислениями. И для обработки большого количества входящих запросов, по-прежнему, потребовалось бы много параллельных потоков.
Не только теория, но и наш опыт промышленной эксплуатации показывает, что реактивные микросервисы держат нагрузки значительно лучше.
Из наших 300 микросервисов больше трети уже реактивные и мы не собираемся останавливаться! А чтобы упростить разработку и сопровождение такой большой кодовой базы, мы пишем библиотеки для реактивного логирования, кэширования, трейсинга, метрик, тестирования и прочие утилиты.
Необходимость писать библиотеки для реактивного стэка — одна из сложностей использования Реактива. Где для обычного многопоточного приложения уже всё написано и работает «из коробки», для реактивного часто нужно «доработать напильником». Но о сложностях поговорим позднее.
Помимо внутренней оптимизации работы приложения, реактивный подход позволяет выстроить реактивное взаимодействие между сервисами, делая их более отзывчивыми. Приложение может получать на вход и отдавать на выход поток данных по мере их «готовности». Так, например, клиенту банка не обязательно видеть в истории все операции сразу, тем более, что загрузка всего объёма займёт заметное время. Достаточно отобразить первую страницу и, по мере чтения данных из базы, подгружать последующие.
Реактивное межсервисное взаимодействие останется за рамками этой статьи, мы будем говорить исключительно про внутренности.
А сейчас предлагаю разобраться с тем, что такое же такое Реактив.
Что такое Реактив в Java
Reactive Streams и причем здесь Project Reactor
Если ничего не работает, почитайте документацию… Немного теории.
Здесь рекомендую познакомиться с The Reactive Manifesto, если ещё не знакомы. Манифест описывает принципы реактивного подхода в общем. Дальше мы будем говорить про реализацию в JVM, а именно про спецификацию Reactive Streams.
Цель Reactive Streams — найти минимальный набор интерфейсов, методов и протоколов, который опишет необходимые операции и сущности для реализации асинхронного потока данных с неблокирующим обратным давлением (non-blocking backpressure).
Обратное давление — это механизм, с помощью которого каждый метод обработки потока данных может управлять количеством входящих в него данных, не блокируя при этом поток. Например, когда источник отдаёт данные быстрее, чем потребитель может их обработать, backpressure поможет выровнять скорость между источником и потребителем.
Согласно спецификации реактивный стрим должен:
обработать потенциально бесконечное количество элементов;
последовательно;
асинхронно передавая элементы между компонентами;
с обязательным неблокирующим обратным давлением.
Сама спецификация Reactive Streams состоит из следующих частей:
API — определяет интерфейсы для реализации Reactive Streams, а также позволяет совмещать различные реализации.
Specification — сама спецификация, в которой описано, как должны вести себя реализации этих интерфейсов.
The Technology Compatibility Kit (TCK) — минимальный набор тестов для проверки соответствия спецификации.
Вот эти самые интерфейсы (API).
Нас будут интересовать Publisher, Subscriber и Subscription.
Publisher — это источник данных, на который можно подписаться методом subscribe.
Subscriber — потребитель: после подписки может получать события жизненного цикла стрима (onSubscribe, onError, onComplete) или данные, ради которых всё и затевалось, они же элементы реактивного стрима (onNext).
Subscription — результат подписки потребителя на источник, с помощью которого можно запросить n элементов (request) или отменить подписку (cancel).
В Java все контрибьютят как могут, реализаций Reactive Streams много. Вот некоторые из них:
Самая популярная реализация, пожалуй, Project Reactor (настоятельно рекомендую к прочтению Reactor 3 Reference Guide целиком). И, что приятно, с ней хорошо интегрируется Spring Framework, который активно используется в микросервисах Альфа-Банка.
На диаграмме классов видно, как взаимосвязан интерфейс реактивных стримов Publisher с двумя основными классами Reactor, Flux и Mono.
Оба класса — источники данных с той разницей, что Flux отдаёт потенциально бесконечное количество элементов, а Mono не больше одного.
Жизненный цикл реактивного стрима
Жизненный цикл пригодится, когда мы будем говорить про накладные расходы (overhead), производительность, работу операторов и потоки. Давайте напишем небольшой реактивный стрим и рассмотрим этапы его сборки, подписки и исполнения.
Все примеры кода можно найти здесь: https://github.com/tirufanova/reactor-exx.
Сборка
В примере кода ниже:
.range создает последовательность элементов от 5 до 100;
.map преобразует каждый элемент в строку;
.filter оставляет в стриме только строки, длина которых равна 1;
.take берет из всей последовательности только 3 первых элемента.
AssemblySubscriptionExecutionTest::assemble
На оператор .hide не смотрите, он здесь для того, чтобы убрать внутренние оптимизации библиотеки, которые могут помешать нам поймать дзен понять Реактив.
Процесс сборки стрима идёт сверху вниз, по мере того, как программа выполняется. Каждый раз на каждом операторе создаётся по дополнительной обертке — «флаксу» — в примере получилось 5 штук.
Подписка
На предыдущем шаге мы только собрали цепочку операторов, которые обрабатывают элементы, но никакой обработки по факту не произошло, потому что нам нужно подписаться на стрим. Для этого подойдёт метод .blockLast, который под капотом вызывает subscribe источника (Flux).
AssemblySubscriptionExecutionTest::subscribe
Подписка происходит снизу вверх:
сначала оператор .blockLast подписывается на оператор .take;
оператор .take подписывается на .filter;
и так далее, пока кто-то не подпишется на первый Flux.range.
И здесь на каждом вызове создаются новые обертки-декораторы. Ещё 5 пять объектов в дополнение к тем, что уже были созданы на этапе сборки.
Исполнение
Цепочка собрана, на неё подписался потребитель. Логично, что данные (элементы стрима) идут сверху вниз, от источника к потребителю. Но по реактивному стриму проходят не только элементы, но и события. А вот с ними не всё так однозначно. Добавим в реактивную цепочку две точки логирования (метод log), чтобы увидеть события.
AssemblySubscriptionExecutionTest::execute
Первым видим сигнал request. Элементы по стриму не начнут идти, пока они не будут запрошены (request). Это тот самый backpressure из реактивного манифеста и реактивной спецификации. Сигнал запроса идёт снизу вверх, от потребителя к источнику.
Backpressure позволяет регулировать, сколько данных мы готовы принять и обработать. В нашем случае — unbounded — присылай всё и без остановки.
На иллюстрации запрос залогирован 2 раза, так как в реактивной цепочке 2 точки логирования. По факту, каждый оператор снизу вверх запрашивает элементы у вышестоящего. При этом количество запрошенных элементов может отличаться в зависимости от внутреннего устройства запрашивающего оператора.
Теперь, наконец-то, начинают идти элементы по стриму, сверху вниз, как и положено, генерируя события onNext.
Когда оператор .take насчитает 3 элемента, он отправляет источнику сигнал о необходимости прекратить отправку элементов. Точка логирования с меткой filtered (выше по стриму) поймала это событие. При получении такого сигнала источник должен прекратить отправку.
А после сигнала для источника, .take вниз отправляет событие о том, что элементы закончились, можно больше ничего не ждать и выходить из метода .blockLast.
Надеюсь, логика жизни реактивного стрима стала понятнее. И появилось поле для самостоятельных экспериментов.
Управление потоками
В самом начале статьи мы говорили, что реактивный подход не даст нам преимуществ в CPU Intensive задачах, потому что работает на небольшом количестве потоков (threads).
Другой случай, когда реактивный подход не подойдет — использование блокирующих вызовов. Всё по той же причине: потоков мало, они заблокируются, и приложение какое-то время не сможет выполнять никакую другую реактивную логику.
На самом деле, если по какой-то ведомой разработчику причине (например, блокирующий драйвер к базе данных), подобного рода задачи приходится решать на реактивном стэке, выход есть!
По умолчанию реактивный стрим целиком исполняется на том же потоке, на котором был создан (точнее, на котором на него произошла подписка), но этим поведением можно управлять.
Для этого нам понадобится отдельный Scheduler — реактивная абстракция пула потоков. Например, для блокирующих вызовов документация Project Reactor рекомендует использовать пул Schedulers.boundedElastic(). Осталось разобраться, как переключить нужный оператор реактивного стрима на новый пул.
Для переключения потока в реактивную цепочку необходимо добавить операторы publishOn либо subscribeOn.
publishOn
Оператор publishOn изменяет поток на этапе исполнения жизненного цикла, для событий onNext, onComplete и onError. Другими словами, поток меняется для всех операторов, выполняющихся ниже по реактивной цепочке. Это в теории… Посмотрим, что происходит на практике.
Добавим в реактивную цепочку вызов publishOn с пулом parallel, а также две точки логирования: до переключения потока (с меткой “before”) и после (с меткой “after”).
PublishOnSubscribeOnTest::publishOn
Логично предположить, что все сообщения с меткой “before” будут выполнены на потоке Test worker (в моем случае это поток JUnit), а с меткой “after” — на потоке пула parallel. Всё правильно, но есть тонкости.
Программа выведет следующий лог.
И, как мы видим, наше предположение сработало не полностью. Действительно, для событий onNext и onComplete с меткой before видим поток Test worker, а с меткой “after” — parallel-1. Но для остальных событий (выделенных жирным) поток выглядит случайным. На самом деле это не так!
Давайте разбираться. В документации к методу написано, что publishOn изменяет поток при публикации элементов (onNext), а также при публикации событий окончания стрима и ошибки (onComplete и onError). Для всех остальных событий используется поток, на котором произошло предыдущее событие. А именно:
События onSubscribe и request происходят на этапе подписки и в начале исполнения стрима, то есть никакая публикация элементов ещё не происходила. Поэтому используется тот же поток, на котором была выполнена подписка на стрим.
События onComplete при этом подчиняются тем же правилам, что и onNext. Если бы не оператор .take в нашей цепочке, отправивший сигнал cancel источнику, в логе было бы ещё одно сообщение onComplete с меткой after на потоке parallel-1.
А вот сигнал cancel отправляется оператором .take вверх по реактивной цепочке, то есть смена потока опять не происходит. Поскольку оператор .take находится после publishOn, то его сигналы будут обработаны на треде пула parallel.
А как изменить поток для остальной части реактивной цепочки, для этапа подписки?
subscribeOn
Оператор subscribeOn выполняет смену потока в процессе подписки на реактивный стрим, а именно для событий subscribe, onSubscribe и request. Вспомним жизненный цикл — событие подписки проходит снизу вверх. То есть subscribeOn изменит поток для операторов, которые находятся выше него по цепочке? Почти…
Давайте возьмем предыдущий пример с теми же метками логирования, но заменим оператор смены потока (вместо publishOn используем subscribeOn).
PublishOnSubscribeOnTest::subscribeOn
И посмотрим, что появится в логе.
Для первых четырех сообщений метка “after” соответствует потоку Test worker, а метка “before” — потоку пула parallel.
Действительно, в момент подписки и запроса элементов событие проходит снизу вверх по реактивной цепочке: сначала выполнится “after”, а потом “before” уже на новом потоке. Но метод subscribeOn никак не влияет на то, что происходит после подписки. Поэтому события остальных фаз жизненного цикла будут происходить на потоке, который был установлен в момент подписки оператором subscribeOn, то есть parallel-1. Вне зависимости от направления сигнала.
Отсюда следует правило: subscribeOn должен находиться как можно ближе к источнику (условно, следующим оператором после источника в реактивной цепочке), а publishOn — как можно ближе к операции обработки, которую хотим переключить на новый пул (непосредственно перед самой операцией).
Чтобы лучше понять это правило, попробуйте в примере выше использовать сразу оба оператора subscribeOn и publishOn, а потом поменять их местами. Сообщения в логе в этих двух случаях будут абсолютно разными.
Неявная смена потока
Операторы subscribeOn, publishOn, сигналы, события… Запутано, но код хотя бы делает то, что обещает — явно изменяет поток исполнения.
Теперь давайте уберём из нашей подопытной реактивной цепочки все операторы смены потока. А вместо них добавим новую логику обработки, метод .delayElements: отдавай каждый следующий элемент вниз по реактивной цепочке с задержкой в 5 наносекунд.
ImplicitThreadSwitchTest::delayElementsThread
Точки логирования остались те же, давайте смотреть логи.
И… откуда-то взялись новые потоки! Причём разные.
Спасибо документации метода delayElements, в ней явно описано, что дальнейшее исполнение стрима будет происходить на пуле parallel. Хотя такой побочный эффект совсем не очевиден из названия метода.
На самом деле, реактивный scheduler значительно более мощный инструмент, чем просто пул потоков. Он позволяет планировать отложенное выполнение какой-либо логики. Например, задерживать публикацию элементов: получи элемент из оператора выше по цепочке, подожди, только потом отправь элемент дальше.
Во внутренней реализации метод .delayElements как раз использует отложенную публикацию на parallel пуле. Метка “after” после оператора .delayElements выводит поток для такой отложенной публикации (onNext). А cancel уже выполнится на том потоке, который оставила предшествовавшая ему отложенная публикация.
В реальной жизни не так часто возникает необходимость задерживать обработку полученных элементов. А объединять элементы из нескольких реактивных стримов — задача практически ежедневная. Например, чтобы собрать из нескольких внешних запросов общий результат.
Объединить несколько стримов помогут такие методы, как .merge, concat или zip. В реализации этих операторов явная смена потока отсутствует. Зато может произойти неявная, если один из исходных реактивных стримов запущен другом потоке. Давайте рассмотрим на примере.
ImplicitThreadSwitchTest::mergeDifferentThreads
Соберём два реактивных стрима, которые генерируют последовательности чисел.
Первый, parallelThreadFlux, отдает числа 10, 11 и переключен на отдельный поток.
Второй, testWorkerThreadFlux, отдаёт 20, 21 и запускается на потоке по умолчанию.
Используем оператор .merge для того, чтобы собрать все числа в одну последовательность. Порядок чисел при этом будет соответствовать очерёдности их получения оператором .merge. И, конечно, расставим точки логирование исходных и результирующего стрима с соответствующими метками.
Разберём сообщения в логе. Первые шесть строк: сначала оператор blockLast инициировал подписку (и запрос элементов) на оператор merge, после чего внутри оператора merge произошла поочерёдная подписка на все стримы, которые ему необходимо объединить. Все подписки и запросы элементов произошли на потоке Test worker, потому что никаких смен потока не происходило.
После запроса элементов по стримам начали идти данные. По меткам testWorker и parallel для событий onNext мы видим, что parallelThreadFlux публикует элементы на parallel пуле, а testWorkerThreadFlux на потоке на Test worker, как и ожидалось. А вот результирующая последовательность (метка merge, выделена жирным в логе) постоянно меняет поток в зависимости от того, на каком потоке (из какого стрима, соответственно) получен исходный элемент. Таким образом смена потока для результирующего стрима происходит неявно.
Почему важно помнить про неявную смену потока? Потому что она может «портить» предшествующую явную смену потока операторами publishOn и subscribeOn, и ваша логика запустится совсем не там, где вы ожидаете. Чтобы избежать такого поведения, используем publishOn и subscribeOn как можно ближе к логике, нуждающейся в отдельном потоке.
Самое время вооружиться всеми полученными знаниями и оценить сложности Реактива.
Какие сложности приносит Reactive и как с этим бороться
Сложности при написании программы
Код становится непонятнее.
Объективно нам стало сложнее. Если в императивном исполнении код будет понятен любому начинающему программисту…
ReadabilityTest::imperativeVsReactive
…то переписанный на Реактив тот же самый код уже требует значительно больше знаний. Обрастает «служебными» методами типа .blockLast. И вдобавок плохо читается.
Это означает, что порог вхождения выше, разработка дольше, а ошибок потенциально больше.
Поэтому лучше не использовать «реактивщину» там, где она не нужна, например, где маленькая нагрузка, нет большого числа внешних вызовов или не требуется высокая отзывчивость (то есть отдавать данные как можно скорее по мере их получения, а не все сразу). Если вы всё-таки решили использовать Реактив, его придется изучить.
Даже если код уже реактивный, не стоит переделывать на Reactor всё подряд. Часто индикаторы того, что вы что-то делаете не так, это вызовы в реактивном коде методов Flux.just, Flux.fromIterable и block*. Подумайте, нельзя ли решить задачу с помощью коллекций, нереактивных стримов или императивного кода? Или остаться в реактивном подходе, но подобрать более подходящие операторы? Это снизит шансы выстрелить себе в ногу.
И, конечно, пишите тесты! В этом поможет библиотека io.projectreactor:reactor-test. Она содержит инструменты для проверки поведения реактивного стрима или реактивной логики, обрабатывающей реактивный стрим.
Помимо сложностей понимания кода, длинные реактивные цепочки порождают и другую сложность, а именно длинный стэк.
Стэк становится длиннее.
И непонятнее.
В процессе отладки мы увидим примерно следующее.
А где, собственно, наш код?.. А, маленькая строчка внизу экрана. Понять, что именно происходит в процессе исполнения программы, даже опытному разработчику, достаточно сложно.
Плюс получаем оверхэд в процессе исполнения. Но об этом чуть позже.
Как помочь себе с отладкой? Можно использовать оператор .log(), он позволяет залогировать события жизненного цикла стрима с заданной меткой, в том числе элементы данных, проходящие через стрим.
Ошибки искать сложнее.
Когда мы видим стектрейс исключения (который стал длиннее, см. пункт выше), непонятно, что и где сломалось. В стектрейсе будет множество служебных реактивных обёрток, и, с большой вероятностью, наш код просто не влезет. А так как в коде приложения происходит только сборка стрима и подписка, а исполнение идёт в недрах Реактива, явная ссылка на логику, породившую ошибку, может вообще отсутствовать в стэке.
Здесь на помощь придёт библиотека io.projectreactor:reactor-tools, которая добавит в стэк вызовы нашей программы, чтобы мы смогли сопоставить исключение с кодом приложения. Одно из приятных свойств этой библиотеки то, что она не добавляет оверхэд в процессе выполнения программы, а это значит, что её можно использовать в продакшн.
Второй важный момент, который сильно упростит жизнь — логируем ошибки явно. Для этого максимально используем оператор onError. Он должен быть как можно ближе к потенциальному месту исключения. Лучше написать лишний onError, чем в логах не найти, где именно произошла ошибка.
Необходимо дорабатывать инструменты.
Выше мы много говорили про смену потоков. Реактив не даёт никаких гарантий, какая часть кода запустится в какой момент и на каком потоке, поэтому многие традиционные инструменты перестают работать. Где-то уже есть реактивные аналоги, а остальное придётся дорабатывать под себя и Реактив.
Прямое следствие смены и переиспользования потоков различными операторами реактивной цепочки то, что ThreadLocal переменные ломаются. Точнее, сами ThreadLocal, конечно, работают, а вот трэйсинги, контексты логирования, данные клиентских запросов, использовавшие эти локальные переменные в многопоточном коде, в реактивном не годятся.
Простой способ добавить контекст к реактивному стриму — явно передавать его вместе с элементами стрима. Подход надёжный, понятный, но портит архитектуру приложения. Реактивный способ решения задачи — использование контекста (Context) библиотеки Reactor.
Контекст — это аналог Map: также хранит в себе пары ключ-значение, позволяя записывать и считывать их. Контекст неизменяемый (immutable), то есть при добавлении к нему нового значения создаётся новый экземпляр контекста. Контекст привязывается к реактивному стриму на этапе подписки, то есть виден только операторам, которые находятся выше его в реактивной цепочке.
Подход нужен чаще для написания библиотек, а не кода приложения, поэтому подробно останавливаться не будем. Продолжим разбираться со сложностями и перейдём от написания программы к её исполнению.
Сложности в процессе выполнения программы
Как влияет на работу нашего приложения создание всяких обёрток? Мы говорили про длинный стек. Собственно, вот он.
Стэк состоит из тех самых оберток, через которые проходят наши элементы.
Чем это плохо? Оверхедом:
Больше объектов, длиннее стэк – больше потребляемой памяти.
Много маленьких объектов — больше работы для GC.
Портим оптимизации JIT компилятора.
Как мы с этим боремся?
Изучаем операторы, которые используем (у Project Reactor очень хороший официальный гайд). Некоторые операторы могут добавлять накладные расходы, генерировать маленькие объекты (например, очереди) под капотом, хотя для нашей задачи мог бы подойти оператор попроще.
Стараемся не делать длинных реактивных цепочек, по возможности объединяем операторы. Например, из нескольких последовательных операторов filter всегда можно сделать один.
Используем оператор .handle. Он тоже позволяет объединять логику нескольких операторов и объединяет логику .map и .filter. Если в случае с .map и .filter. у нас создается 4 дополнительных объекта,…
…то с .handle всего 2.
Оговорюсь, что не стоит увлекаться преждевременными оптимизациями. Если у вас нет проблем с производительностью реактивного приложения и в нём отсутствуют длинные реактивные цепочки, то код с .map и .filter читается значительно лучше. А Project Reactor достаточно умный, чтобы делать собственные оптимизации под капотом. Как всегда, серебряной пули не существует, ищите свои рецепты, проверяйте на практике.
Теперь самое время перейти к ней, к практике.
Как сломать реактивный стрим
Реактив позволяет выполнять (условно) параллельную обработку данных, берёт на себя все сложности работы с многопоточностью. В его арсенале множество операторов манипуляции с данными и самими реактивными стримами. Но как бы не были скрыты сложности многопоточности, иногда непонимание внутреннего устройства может привести к тому, что обработка данных пойдёт не по плану.
Сейчас мы наконец-то что-нибудь сломаем! Для этого нам понадобится два оператора — groupBy и flatMap.
Оператор groupBy
Оператор groupBy разбивает исходный стрим на множество отдельных стримов по какому-то условию. Похож на работу оператора groupBy в Java Stream API.
Здесь нас интересует prefetch — это количество элементов, которые оператор предподгружает из реактивного стрима в свой внутренний буфер. В событиях реактивного стрима мы увидим его как backpressure, запрос элементов. Значение по умолчанию 256.
Оператор flatMap
Работает наоборот, а именно:
преобразует каждый элемент исходного реактивного стрима в новый реактивный стрим (с помощью функции, которую получает на вход);
затем сливает эти внутренние стримы в один;
при этом элементы из внутренних стримов будут чередоваться.
Тоже похож на оператор flatMap в Java Stream API.
Если посмотреть документацию, то мы найдём интересный параметр — concurrency. Он задаёт максимальное количество внутренних стримов, которые одновременно может объединять оператор flatMap. Значение по умолчанию — 256. То есть потенциально flatMap объединит любое количество стримов, но чтобы перейти к обработке 257 стрима, один из первых 256 должен завершиться.
Собираем стрим
В руководстве Project Reactor висит ружье, которому пора выстрелить есть примечание: если воспользоваться одновременно groupBy и flatMap, то при большом количестве групп в groupBy (high cardinality) и низкой конкурентности flatMap (одновременно обрабатывается мало стримов, low concurrency) обработка стрима может зависнуть (lead to hangs).
Казалось бы, у нас неблокирующая обработка, как она может зависнуть? Звучит странно. Давайте разберёмся.
Допустим, у нас есть список имён — входной стрим namesFlux.
И нам нужно посчитать количество имён по первой букве — функция countNamesFunction.
Для этого сначала сгруппируем имена по первой букве оператором groupBy, применим функцию подсчёта countNamesFunction и соединим результаты обратно в общий стрим оператором flatMap.
По всем стримам расставим точки логирования: в функции подсчёта имен, а также перед оператором groupBy и после flatMap.
GroupByWithFlatMapNamesTest::completeNamesCount
Пока всё идет по плану, в логе появятся следующие значения для 4 групп имен (остальные события опущены для краткости):
В нашей цепочке недостаточно много групп и не такая низкая конкурентность, чтобы всё сломалось. Будем это исправлять, зададим конкурентность 3 для flatMap (достаточно мало для четырёх групп).
Чтобы добиться зависания на нашем объёме данных, понадобится также поменять prefetch для оператора groupBy, сделаем его 3. Чуть позже станет понятно, почему именно это «магическое число».
Реактивный стрим подсчета имён теперь выглядит так.
GroupByWithFlatMapNamesTest::timeoutNamesCount
И он зависнет! Чтобы понять, почему, разберём логи.
Вот что происходит, построчно:
1, 2 — события подписки.
3 — запрос бесконечного количества элементов оператором blockLast.
4 — запрос трёх элементов оператором groupBy (значение параметра prefetch).
5 — источник namesFlux получил запрос и начал отправлять элементы, первый Carl.
6, 7 — groupBy обработал первый элемент, создал группу для буквы C и отдал реактивный стрим с группой оператору flatMap. Тот, в свою очередь, подписался на группу и запросил 32 элемента для подсчёта имён.
8 — groupBy ожидает ещё 2 элемента в ответ на первый запрос элементов (в строке 4), но так как значение prefetch равно 3, запрашивает ещё один элемент.
9-16 — аналогично 5-8, создалось ещё 2 группы, теперь flatMap подписан на 3 реактивных стрима (для первых букв C, D, A). Новые подписки он создавать не будет, пока какой-то из текущих стримов групп не завершится.
17-19 — groupBy получил ещё три элемента. Но все эти имена относятся к новой группе на букву B, а flatMap больше не создаёт новые подписки. Оператор groupBy при этом не запрашивает новые элементы, так как prefetch равен 3 и ровно столько же элементов ещё не отправлено дальше по стриму. И поскольку новые элементы не запрашиваются, groupBy не знает, можно ли завершить какую-то из предыдущих групп, так как событие об окончании стрима придёт только после следующего элемента Alice.
Всё, стрим заблокирован.
В реальной жизни, на большем объёме и разнообразии данных, блокировка может произойти и для значений prefetch и concurrency по умолчанию.
Поэтому важно понимать, какие именно данные проходят через реактивный стрим и как работают операторы, которые их обрабатывают. Возможно для конкретной задачи необходимо поменять значения по умолчанию или выстроить логику обработки по-другому.
Заключение
Реактив — мощный инструмент, который позволяет строить высоконагруженные отзывчивые приложения. Мы в Альфа-Банке убедились в этом на практике.
Вместе с тем важно понимать, какой именно профит даст реактивный подход для конкретной задачи. Ведь вместе с большой силой приходит большая ответственность с большими возможностями приходят и новые вызовы для разработчика.
Реактивный код имеет структуру и логику исполнения отличную от императивного многопоточного.
Это заставляет разработчика думать по-другому. А также менять привычные инструменты тестирования, трассировки, логирования, взаимодействия с внешними сервисами и т.д.
Чтобы программа делала то, что задумано, важно понимать, как устроены реактивные стримы в целом и конкретные операторы.
Помогут в этом документация и инструменты реактивного тестирования, логирования и обработки ошибок.
И конечно, Реактив иногда подкидывает головоломки. Разбираться в них может быть непросто, зато очень интересно.
Используйте Реактив с удовольствием.
Рекомендуем почитать [подборка от редактора]
Как мы сайт Альфа-Банка на митапе шатали и нам за это ничего не было
Как в банке внедрить облачные технологии так, чтобы это было удобно, безопасно, быстро и дёшево
Также подписывайтесь на Телеграм-канал Alfa Digital — там мы постим новости, опросы, видео с митапов, краткие выжимки из статей, иногда шутим.
Комментарии (11)
ris58h
14.04.2023 19:20Виртуальные потоки не пробовали? Я понимаю что они когда-то потом, а надо сейчас, но всё же.
Taruf Автор
14.04.2023 19:20Про Project Loom? Сыроват пока, но у нас много кода на kotlin coroutines, бизнес логика на корутинах значительно приятнее, чем на реакторе. В библиотеках всё равно больше reactor, никуда от него со спрингом не денешься
ris58h
14.04.2023 19:20Про Project Loom?
Да.
никуда от него со спрингом не денешься
Спринг нельзя без reactor использовать? Что-то новенькое.
Taruf Автор
14.04.2023 19:20Спринг нельзя без reactor использовать? Что-то новенькое.
Можно, конечно. Но иногда нужно что-то к вебфлаксу доделать вроде фильтра для логирования, например
jreznot
14.04.2023 19:20Реактивный Spring весь внутри на Project Reactor, даже если вы используете поверх корутины, то они работают через адаптер
sergey-gornostaev
14.04.2023 19:20Виртуальные потоки - это не альтернатива реактивным стримам, уровень абстракции другой.
ris58h
14.04.2023 19:20Я про решение изначальной задачи, а не про инструмент.
sergey-gornostaev
14.04.2023 19:20Наличие или отсутствие виртуальных потоков в используемой JVM по большому счёту безразличны прикладному программисту, мы просто продолжим так же писать реактивные стримы, а Spring Ractor или Akka у себя под капотом сами решат использовать системный поток или легковесный для выполнения наших стримов. Поэтому вопрос про виртуальные потоки лучше задавать системщикам, вроде Олега Докука или Йонаса Бонэра.
sergey-gornostaev
14.04.2023 19:20переписанный на Реактив тот же самый код уже требует значительно больше знаний. Обрастает «служебными» методами типа .blockLast. И вдобавок плохо читается.
Это спорно. В большой кодовой базе декларативные стримы читаются намного проще императивной лапши. Особенно в случае многопоточного кода.
jreznot
А вот так можно увидеть ваш код в IntelliJ IDEA и всё-таки понять, что происходит
https://www.jetbrains.com/help/idea/reactor.html#reactor-debug