Так уж получилось, что, устроившись на свою первую работу, я был вынужден напрямую столкнуться с Project Reactor в некоторых сервисах (чего, честно говоря, совсем не ожидал). Почесав репу, я понял, что придется мне идти курить доку, так как знаний по данной теме у меня, мягко говоря, было немного (знал Reactor примерно на уровне: чем отличается Mono от Flux).

В процессе чтения возникла гениальнейшая идея - попрактиковать свои навыки понимания тех. литературы на английском, при этом еще и разобравшись в необходимой мне теме, попутно исполнив свою хотелку написать статью для Хабра. Так и родился мой перевод вводной страницы из документации по реактору.

Так что важная заметка перед прочтением: Данная публикация является переводом введения из докмуентации Project Reactor, автор (пока что) не обладет достаточнми компетенциями для комментирования тех или иных моментов, так что если вы пришли за какими-либо инсайтами или интересными особенностями, данная статья, возможно, не лучший выбор. В любом случае - буду длагодарен за замечания и комментарии. Итак, поехали.

Введение в реактивное программирование

Reactor - это имплементация парадигмы реактивного программирования, описать которую можно следующим образом:

Реактивное программирование - парадигма асинхронного программирования, основанная на потоках данных и принципе распространения состояния. Это означает, что появляется возможность удобного представления статичных (к примеру массивов) или динамических (к примеру источников событий) потоков данных с помощью используемого языка программирования.

https://en.wikipedia.org/wiki/Reactive_programming

В качестве первого приближения реактивного программирования компанией Microsoft были созданы реактивные расширения (Rx - Reactive Extensions) в экосистеме .NET. Затем, в RxJava было реализовано реактивное программирование и для JVM. По прошествии некоторого времени, стандарты реактивного программирования для Java возникли благодаря Reactive Streams - спецификации, определяющей набор интерфейсов и правил взаимодействия реактивных библиотек для JVM. Данные интерфейсы были введены в Java 9 в лице класса Flow.

Парадигма реактивного программирования в ООП часто демонстрируется как расширение щаблона проектирования Observer (Наблюдатель). Можно также сравнить главный паттерн рекактивных потоков с хорошо знакомым многим шаблоном проектирования Iterator, так как есть некоторое двойственность названий, по типу Iterable - Iterator во всех подобных библиотеках. Однако главное отличие заключается в том, что в то время, как Iterator работает по pull-based (потребитель сам "вытаскивает" необходимые ему данные) модели, реактивные потоки пропагандируют push-based (генерируемые данные "проталкиваются" в потребитель) подход.

Использование Iterator'a - паттерн императивного программирования, несмотря на то, что доступ к данным - обязанность исключительно интерфейса Iterable. На самом деле, решение по получению next() элемента из последовательности лежит на плечах разработчика. В реактивных стримах, эквивалентом пары Iterable - Iterator является Publisher - Subscriber (издатель - подписчик). Но здесь уже сам Publisher уведомляет Subscriber'a о новых данных по мере их поступления, и данный аспект является ключевым на пути к заветной реактивности. Вдобавок, операции, применяемые к "проталкиваемым" данным, описываются декларативно, а не императивно: Разработчик описывает логику вычислений вместо подробного описания процесса управления ими (вычислениями).

Вдобавок к "проталкиванию" значений, аспекты обработки ошибок и завершения потока данных описаны в четко сформулированном стиле. Publisher может не только отправлять новые значения своему(им) подписчику(ам), вызывая onNext(), но и сигнализировать об ошибке, с помощью onError(), либо об окончании отправки данных, с помощью onComplete(). Ошибки, как и события завершения потока, прерывают последовательность. Можно описать это слеюующим образом:

onNext x 0..N [onError | onComplete]

Данный подход - очень гибкий. Он также справляется с ситуациями, когда мы имеем ноль, одно, либо n значений (включая бесконечные последовательности данных, такие как непрерывное тиканье часов).

Но зачем вообще нам может понадобиться асинхронная реактивная библиотека?

1. Блокировки могут быть расточительными

Современные приложения порой достигают огромных потоков одновременных пользователей, и, хотя возможности аппаратные возможности современного оборудования продолжают улучшаться, производительность является ключевой проблемой любого современного ПО.

Есть, по крайней мере, два способа повышения производительности программы:

  • Параллелизация путем использования большего числа как потоков, так и аппаратных ресурсов.

  • Поиск боллее эффективных подходов на основе данных об использовании доступных на данный момент ресурсов.

Обычно, Java разработчики пишут программмы, прибегая к использованию блокирующего кода. Данная практика показывает себя хорошо до тех пор, пока не появится так называемый боттлнек (от англ. bottleneck - бутылочное горлышко) по производительности. Тогда-то и приходит время дополнительных потоков, по сути исполняющих тот же самый блокирующий код. Однако, масштабирование использования ресурсов такого рода быстро приведет к проблемам конкурентности.бычно, Java разработчики пишут программмы, прибегая к использованию блокирующего кода. Данная практика показывает себя хорошо до тех пор, пока не появится так называемый боттлнек (от англ. bottleneck - бутылочное горлышко) по производительности. Тогда-то и приходит время дополнительных потоков, по сути исполняющих тот же самый блокирующий код. Однако, масштабирование использования ресурсов такого рода быстро приведет к проблемам конкурентности.

Хуже того, блокировки расходуют ресурсы. Если присмотреться, в случае, когда программа включает в себя разного рода задержки (особенно IO операции, такие, как запрос в БД, либо же запрос по сети), ресурсы непременно тратятся, так как потоки (вероятно множество потоков) теперь простаивают без дела, в ожидании данных.

Так что параллелизация - не панацея. Она (параллелизация) необходима для использования аппаратных мощностей "на полную", но в то же время сложна для понимания и создает риск растраты ресурсов впустую.

2. Асинхронность спешит на помощь?

Один из подходов, упомянутых раннее - поиск более эффективных подходов, может быть решением проблемы растраты ресурсов. Создавая асинхронный, неблокирующий код, вы позволяете программе переключиться на другую активную задачу, использующую те же ресурсы, и позже вернуться к текущему потоку, когда асинхронная обработка будет завершена.

Но как же всё-таки написать асинхронный код для JVM? Java предлагает нам две модели асинхронного программирования:

  • Коллбеки (от англ. Callbacks): Асинхронные методы, не имеющие возвращаемого значения и принимающие так называемый коллбек параметр (лямбду или анонимный класс), который будет вызван как только результат вычислений станет доступен. Один из известных примеров - иерархия EventListener в библиотеке Swing.

  • Futures (фьючи): Асинхронные методы, немедленно возвращающие объект Future<T>. Асинхронный процесс вычисляет значение T, но за доступ к нему отвечает объект Future. Значение доступно не сразу, мы можем запрашивать его у объекта, пока оно не станет доступно. Для примера, ExecutorService, выполняющий Callable<T>, оперирует объектами Future.

Достаточно ли хороши упомянутые выше подходы? Ответ очевиден: все зависит от конкретной ситуации, ведь каждый из них имеет свои ограничения.

Коллбеки тяжело соединять между собой, что ведёт к трудночитабельному и трудноподдерживаемому кода (это являение известно как "ад обратных вызовов").

Давайте рассмотрим пример: отображение на UI пользователя топ-5 записей из его списка избранных, либо же отображение рекомендаций если список избранного пуст. В итоге нужно сходить в три сервиса (первый возвращает идентификаторы любимых записей, второй - подробности о записях, третий - возвращает рекомендации с их подробностями).

Демонстрация Callback Hell:

Скрытый текст
userService.getFavorites(userId, new Callback<List<String>>() { // 1 
  public void onSuccess(List<String> list) { // 2
    if (list.isEmpty()) { // 3
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { // 4
          UiUtils.submitOnUiThread(() -> { // 5
            list.stream()
                .limit(5)
                .forEach(uiList::show); // 6
            });
        }

        public void onError(Throwable error) { // 7
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() // 8
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, // 9
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
  1. У нас есть сервисы, основанные на коллбеках: интерфейс Callback с методом, вызываемым в случае успеха (onSuccess) и методом, вызываемым в случае возникновения ошибки (onError).

  2. Первый сервис вызывает коллбек, передавая туда список идентификаторов избранных записей

  3. В случае, если список идентификаторов пуст, идем в сервис рекомендаций (suggestionService).

  4. Сервис рекомендаций передает List<Favorite> второму коллбеку.

  5. Так как мы имеем дело с UI, нужно убедиться, что код-потребитель работает в UI потоке

  6. Используем Stream API из Java 8 чтобы ограничить число рекомендаций до 5 и отображаем их на UI.

  7. На всех уровнях мы обрабатываем ошибки единым образом - отображаем их в попапе (popup).

  8. Вернемся к сервису, работающему с идентификаторами избранных записей. Если сервис вернул нам полный список, идем в favouriteService для получения объектов Favorite с подробностями. Они нам нужны в количестве 5 штук, так что ограничиваем их количество до 5.

  9. И снова коллбек. Здесь мы наконец получаем полноценный список объектов Favorite, которые показываем в интерфейсе внутри UI потока.

Довольно много кода, не так ли? К тому же, он тяжеловато читается и имеет много повторяющихся частей. Рассмотрим код, который решает ту же задачу с помощью Reactor.

Пример реактивного кода эквивалентного коду с коллбеками:

Скрытый текст
userService.getFavorites(userId) // 1
           .flatMap(favoriteService::getDetails) // 2
           .switchIfEmpty(suggestionService.getSuggestions()) // 3
           .take(5) // 4
           .publishOn(UiUtils.uiThreadScheduler()) // 5
           .subscribe(uiList::show, UiUtils::errorPopup); // 6
  1. Начинаем с потока ID избранных записей.

  2. Асинхронно превращаем их в детализированные объекты Favorite (с помощью flatMap). Теперь имеем дело с потоком объектов Favorite.

  3. Если поток с избранными записями пуст, происходит fallback к suggestionService.

  4. Нас интересуют максимум 5 элементов итогового потока.

  5. В конце концов, мы хотим обрабатывать каждый элемент в UI потоке.

  6. Запускаем наш конвейер, указывая, что делать с окончательными данными (отображать на UI), а также что делать в случае ошибки (отображать popup).

Что если нам важно, чтобы получение идентификаторов занимало менее 800мс, а в случае, если выходим за временные рамки, следует забирать айдишники из кеша? В случае коллбеков, это нетривиальная задача. В реакторе же это решается добавлением оператора timeout в цепочку обработки.

Пример реактивного кода с таймаутом и фоллбеком:

Скрытый текст
userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) // 1
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) // 2
           .flatMap(favoriteService::getDetails) // 3
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
  1. Если предыдущий источник работает дольше 800мс, сгенерировать ошибку.

  2. В случае ошибки, заглянуть в кеш с помощью cacheService.

  3. Дальнейшая цепочка идентична описанной в предыдущем примере.

Объекты Future выглядят немного лучше, чем коллбеки, однако они все еще тяжело объединяются друг с другом, даже несмотря на улучшения, привнесенные в Java 8 в лице CompletableFuture. Оркестрировать множество объектов Future между собой возможно, но не легко. Также Future имеет ряд других проблем:

  • Легко оказаться в ситуации блокировки, вызывая метод get().

  • Нет поддержки ленивых вычислений.

  • Им недостает поддержки множества значений, а также продвинутой обработки ошибок.

Давайте рассмотрим еще один пример: Получение списка ID по которым мы будем получать имя и статистику, соединяя их попарно - все это должно происходить асинхронно. Следующий пример демонстрирует решение проблемы с помощью CompletableFuture.

Пример комбинации нескольких CompletableFuture

Скрытый текст
CompletableFuture<List<String>> ids = ifhIds(); // 1

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { // 2
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { // 3
				CompletableFuture<String> nameTask = ifhName(i); // 4
				CompletableFuture<Integer> statTask = ifhStat(i); // 5

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); // 6
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); // 7
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); // 8
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) // 9
			.collect(Collectors.toList()));
});

List<String> results = result.join(); // 10
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
  1. Начинаем с объекта Future, который вернет нам список ID для обработки

  2. Как только получим список ID, начинаем его асинхронную обработку.

  3. Для каждого элемента в списке:

  4. Асинхронно получаем соответствующее ему имя

  5. Асинхронно получаем соответствующую ему статистику

  6. Объединяем оба результата.

  7. Теперь мы имеем список Future, включающий в себя все задачи с комбинациями. Чтобы выполнить эти задачи, надо сконвертировать лист в массив.

  8. Передаем получившийся в массив в метод CompletableFuture.allOf(), который вернет Future, который будет завершен, когда выполнятся все таски из массива.

  9. Сложность состоит в том, что allOf() возвращает CompletableFuture<Void>, так что нам приходится еще раз итерироваться по списку с Future, собирая их результаты с помощью join() (в данном случае метод не блокирует поток выполнения, так как allOf() гарантирует нам, что все таски уже завершены).

  10. Когда вся асинхронная цепочка запускается, мы ждем ее завершения и затем возвращаем список рещультатов, валидность которых проверяем.

Так как реактор предлагает большее число операторов для комбинации данных "из коробки", мы можем упростить весь процесс.

Пример реактивного кода, эквивалентного предыдщему:

Скрытый текст
Flux<String> ids = ifhrIds(); // 1

Flux<String> combinations =
		ids.flatMap(id -> { // 2
			Mono<String> nameTask = ifhrName(id); // 3
			Mono<Integer> statTask = ifhrStat(id); // 4

			return nameTask.zipWith(statTask, // 5
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); // 6

List<String> results = result.block(); // 7
assertThat(results).containsExactly( // 8
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
  1. В данном примере все начинается с последовательности ID, представленных в асинхронном стиле (Flux<String>).

  2. Обрабатываем каждый элемент последовательности асинхронно (внутри тела функции flatMap()) дважды - получение имени и статистики.

  3. Получаем соответствующее имя.

  4. Получаем соответствующую статистику.

  5. Асинхронно комбинируем эти значения.

  6. Аггрегируем значения в List по мере их готовности.

  7. В проде мы, скорее всего, продолжили бы асинхронно работать с Flux'ом, например подписавшись на него. Вероятнее всего, мы возвращали бы результат в виде Mono. Так как данный код демонстративный, можем позволить себе заблокироваться, ожидая окончания обработки данных, затем напрямую вернуть список необходимых значений.

  8. Проверяем результат.

Вывод - риски использования коллбеков и Future в целом похожи. Именно от них и старается избавиться реактивное программирование с помощью механизма Publisher-Subscriber.

3. От императивного программирования к реактивному

Реактивные библиотеки, такие как реактор, нацелены на устранение недостатков "классических" подходов асинхронного программирования для JVM, в то же время фокусируясь на некоторых дополнительных аспектах:

  • Композиционность и ясность кода.

  • Данные представлены в виде потока, управляемого с помощью широкого набора операторов.

  • Ничего не происходит до момента подписки (метод subscribe()).

  • Обратное давление или же способность приемника данных сигнализировать источнику, что частота выдачи данных слишком высокая.

  • Высокоуровневая, но в то же время очень ценная абстракция, не зависящая от параллелизма.

3.1. Композиционность и ясность кода

Под "композиционностью" мы понимаем возможность орагнизовывать множество асинхронных задач, где результаты выполнения предыдущих задач передаются следующим. Помимо этого, можно запускать несколько задач в fork-join стиле, а также переиспользовать асинхронные задачи как самостоятельные компоненты в более высокоуровневых системах.

Возможность "дирижировать" задачами тесно связана с читабельностью и поддерживаемостью кода. По мере того, как асинхронные процессы растут в объеме и сложности, поддерживать и понимать код становится в разы сложнее. Как мы уже могли видеть, подход с коллбеками простой, но один из его главных недостатков заключается в том, что в случае сложных процессов, приходится иметь один коллбек, вызываемый из другого, и т.д. по цепочке. Эта хаотичная ситуация называется "Адом обратных вызовов" (Callback Hell). Как вы уже могли догадаться (или знаете по опыту), к такому коду довольно сложно возвращаться и как-либо его комментировать.

Reactor предлагает множество вариантов компоновки, в которых код отражает организацию абстрактного процесса, и все, как правило, поддерживается на одном уровне (вложенность сведена к минимуму).

3.2. Аналогия с линией сборки

Можно сравнить данные, обрабатываемые реактивным приложением, с изделиями, движущимися по линии/конвейеру сборки. Reactor в данной ситуации является и конвейерной лентой, и рабочими станциями. Сырой материал вытекает из источника (первоначальный Publisher) и в итоге превращается в окончательный продукт, готовый к потреблению консьюмером (Subscriber).

Сырье может претерпевать различные преобразования и другие промежуточные шаги, либо же быть частью большЕй сборочной линии, которая соединяет промежуточные результаты воедино. Если происходят неполадки или засорение на одном из этапов (например упаковка изделий стала занимать непозволительно долгое время), проблемная станция может послать сигнал "наверх" о том, что стоит ограничить объем подаваемого сырья.

3.3. Операторы

В Reactor операторы являются теми самыми рабочими станциями из нашей аналогии. Каждый оператор привносит какое-либо новое поведение Publisher'а, а также оборачивает предыдущий Publisher в новый объект. Таким образом, получается цепочка, где данные выходят из первого Publisher'a и двигаются вниз по цепочке, претерпевая изменения в каждом звене. В конце концов, Subscriber завершает процесс. Необходимо помнить, что ничего не будет происходить до того момента, пока Subscriber не подпишется на Publisher, что мы и рассмотрим в скором времени.

Осознание того факта, что операторы создают новые инстансы может помочь избежать распространенную ошибку, когда кажется, что используемый вами оператор не работает. Об этом можно почитать в FAQ.

В то время, как спецификация Reactive Streams не включает в себя операторы, они являются одними из самых значимых вещей в таких реактивных библиотеках, как Reactor, предоставляющий большой набор операторов. Они (операторы) охватывают большой спектр вопросов, начиная с простых преобразований и заканчивая сложной оркестрацией и обработкой ошибок.

3.4. Ничего не происходит до subscribe()

В Reactor, когда мы создаем цепочку из Pusblisher'ов, данные не начинают извлекаться оттуда по умолчанию. На самом деле, мы создаем абстрактное описание нашего асинхронного процесса (это помогает переиспользовать и структурировать код).

Совершая подписку, мы привязываем Publisher к Subscriber, что запускает поток данных во всей цепи. Под капотом это достигается с помощью одиночного сигнала типа request, который выталкивается наверх, влоть до исходного Publisher'a.

3.5. Обратное давление (Backpressure)

Распространение сигналов вверх по цепочке также используется для реализации backpressure механизма, который мы уже описывали, как сигнал, который отправляется вверх по линии сборки, когда текущая рабочая станция работает медленнее, чем вышестоящие.

Реальный механизм, описываемый спецификацией Reactive Streams, довольно близок к приведенной нами аналогии: Подписчик может работать в неограниченном режиме и разрешать источнику посылать все данные настолько быстрго, насколько это возможно, либо же с помощью уже упомянутого механизма сигнализировать источнику, что сейчас он готов обрабатывать максимум n элмемнтов.

Промежуточные операторы также могут изменять запрос "в пути". Представим оператор buffer(), который группирует элементы в батчи (batches) по 10 штук. Если подписчик запрашивает один буфер, источнику разрешено отправить 10 элементов. Некоторые операторы реализуют стратегии предварительной выборки (prefetching strategies), которые позволяют избежать циклов с запросами типа request(1), а также они полезны, если генерация данных перед их запросом стоит не так дорого.

Упомянутые аспекты превращают push модель в гибридную push-pull модель, когда нижестоящие подписчики могут "вытягивать" n элементов "сверху", если они уже готовы. Если же данные еще не готовы, они будут поданы сразу же, как только они будут произведены.

3.6. Горячее против холодного

Семейство Rx - подобных реактивных библиотек различает две больших категории реактивных последоавтельностей: холодные и горячие. Это отличие заключается в основном в том, как реактивный поток реагирует на подписчиков:

  • Холодная последовательность воспроизводится заново для каждого подписчика, включая источник данных. К примеру, если источник представляет собой HTTP запрос, для каждого подписчика будет выполняться новый HTTP - запрос.

  • Горячая последовательность не начинается снова для каждого подписчика. Новые подписчики будут получать сигналы, отправленные после момента подписки. Однако стоит заметить, что некоторые горячие реактивные стримы могут кешировать или воспроизводить заново всю историю полностью или частично. С общей точки зрения, горячая последовательность может работать и без активных подписчиков (исключение из правила "ничего не произойдет пока вы не подпишетесь").

Подробности про горячие и холодные последовательности в контексте Reactor можно посмотреть здесь.


Ну, вроде бы и все. Изначально планировал перевести как можно точнее к оригиналу, но в некоторых местах все же позволил себе немного переиначить смысл текста в пользу большей ясности. Вообщем, буду рад замечаниям.

Комментарии (2)


  1. jbourne
    29.01.2025 08:42

    Хорошая статья.
    Добавлю лишь, что:
    - Подход с Callback'ами в Java - это больше редкость; намного чаще с Future пишется асинхронная логика
    - Реактивный код - это структура с определенным набором элементов и иногда бывает ваша логика в нее не может вписаться никак, а фьючеры более гибки
    - Дебагинг реактивного кода не нативен и сложнее императивного
    - В реактивном коде можно динамически строить пайплайн как конструктор, что иногда спасает
    - И самое главное - в статье не упоминаются Virtual Threads, которые призваны решить упомянутые проблемы фьючеров, уйти от КоллбекХела и сделать императивный вариант лучше; в общем они сокращают число преимуществ реактивного подхода над императивным для определенного круга задач (с интенсивным IO, как у вас).

    И еще: как по мне эти реактивные пайплайны выглядят очень симпатично с эстетической точки зрения.


    1. t3hk0d3
      29.01.2025 08:42

      Имхо главная проблема реактивщины - нужен хороший опыты и понимание фреймворка чтобы написать хорошо работающий код, иначе получается кривое косое.

      В этом плане coroutines/virtual threads намного проще для использования для разработчика средней руки.

      Часто наблюдал где люди вставляли горячие блокирующие вызовы в flatMap, а потом удивлялись почему все работает плохо. Винят они конечно же реактивный фреймворк.