Привет! Удивительно, но первая часть статьи даже кому-то понравилась.
Отдельное спасибо за ваши отзывы и комментарии. У меня для вас плохая хорошая новость: нам ещё есть о чём поговорить! А если точнее, то о некоторых деталях работы Reactor.


Я отрекаюсь от магии


Для дальнейшего углубления в Reactor не будет лишним описать некоторые принципы его работы. Что же тщательно скрывается от нас за внешним слоем из Flux и Mono?


Напоминание

Flux и Mono реализуют интерфейс Publisher.


public void subscribe(Subscriber<? super T> s);

Официальная документация предлагает сравнивать Reactor с конвейером. Publisher выдаёт какие-то данные (материалы). Данные идут по цепочке из операторов (конвейерной ленте), обрабатываются, в конце получается готовый продукт, который передаётся в нужный Consumer/Subscriber и употребляется уже там.


Как работают операторы Reactor? Рецепт усреднённый, потому что вариаций масса. Попытаемся дать грубое описание.


У каждого оператора есть какая-то тактика реализация в виде объекта. Вызов оператора у Flux/Mono возвращает объект, реализующий этот оператор. Например, вызов flatMap() вернёт объект типа FluxFlatMap (наследник Flux).


Т.е. оператор — это Publisher, который, помимо какой-то своей логики, содержит ссылку на исходный (source) Publisher, к которому применяется. Вызовы операторов создают цепочку из Publisher.


При вызове subscribe() создаётся исходный Subscriber, он передаётся назад по нашей цепочке из паблишеров, каждый Publisher может завернуть Subscriber в другой Subscriber, таким образом создавая ещё одну цепочку, которая и передаётся в исходный Publisher для выполнения.


Логично, что всё это несёт какой-то оверхед, поэтому рекомендуется воздержаться от написания обычного (синхронного) кода через Flux или Mono.


Schedulers | Планировщики


Reactor не заботит модель исполнения вашей программы, но он любезно предоставляет необходимый для управления исполнением инструментарий. Разработчик самурай волен самостоятельно выбирать свою судьбу модель исполнения.


Модель исполнения и её детали определяются имплементацией интерфейса Scheduler (т.е. планировщика). Есть статические методы для ряда случаев жизни, позволяющие указать контекст выполнения:


  • Schedulers.immediate(). Выполнение будет происходить в текущем потоке;
  • Schedulers.single(). Выполнение в выделенном потоке. Осторожно! Он и в самом деле single, обращение не создаст новый scheduler/поток, а вернёт кешированное значение. Для создания выделенного потока/scheduler на каждый вызов используйте Schedulers.newSingle();
  • Schedulers.elastic(). Уже упоминался в прошлой статье. Выполнение задач списывает на workers (работяг, «воркеров»), которых сам же и создаёт. В случае idle (бездействия) worker прибивается. В качестве воркера выступает ExecutorService. Используется для блокирующих задач, например I/O. По умолчанию — unbounded, если нужно ограничение на количество воркеров — используйте Schedulers.newElastic();
  • Schedulers.parallel(). N воркеров, оптимизированных для параллельной работы. По умолчанию N = количеству доступных ядер, т.е. Runtime.getRuntime().availableProcessors(). Осторожно! Внутри Docker этот метод может нагло вам врать.

Стоит отметить, что коробочные Schedulers.single() и Schedulers.parallel() выбрасывают IllegalStateException при попытке запустить в них блокирующий оператор: block(), blockLast(), toIterable(), toStream(). Такое нововведение появилось в релизе 3.1.6.


Если всё-таки хотите заниматься подобными извращениями — используйте Shchedulers.newSingle() и Schedulers.newParallel(). Но лучшей практикой для блокирующих операторов считается использование Schedulers.elastic() или Schedulers.newElastic().


Экземпляры Scheduler так же можно инициализировать из ExecutorService с помощью Schedulers.fromExecutorService(). Из Executor тоже можно, но не рекомендуется.


Некоторые операторы из Flux и Mono запускаются сразу на конкретном Scheduler (но можно передать и свой). К примеру, уже знакомый Flux.interval() по умолчанию запускается на Schedulers.parallel().


Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Контекст исполнения


Как же сменить контекст исполнения? Нужно прибегнуть к одному из уже знакомых нам операторов:


  • publishOn();
  • subscribeOn().

Они оба принимают Scheduler в качестве аргумента и позволяют изменить контекст выполнения на указанный Scheduler.
Но почему их два и в чём же разница?


В случае с publishOn этот оператор применяется так же, как и любой другой, посреди цепочки вызовов. Все последующие Subscriber будут выполняться в контексте указанного Scheduler.


В случае с subscribeOn оператор «глобальный», срабатывает сразу на всю цепочку Subscriber. После вызова subscribe() контекстом выполнения будет указанный Scheduler. Далее контекст может изменяться с помощью оператора publishOn. Последующие вызовы subscribeOn игнорируются.


Спасибо stackoverflow за пример. Код вида


Flux.just("a", "b", "c") //this is where subscription triggers data production
      //this is influenced by subscribeOn
      .doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName()))
      .publishOn(Schedulers.elastic())
      //the rest is influenced by publishOn
      .doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName()))
      .subscribeOn(Schedulers.parallel())
      .subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName()));

Thread.sleep(5000);

выведет следующий результат:


before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
before publishOn: parallel-1
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2

Обработка ошибок


В Reactor исключения воспринимаются как terminal event (терминальное событие).
Если где-то произошло исключение, значит, что-то пошло не так, наш конвейер останавливается, а ошибка прокидывается до финального Subscriber и его метода onError.


Любимая картинка


Почему так? Reactor не знает о серьёзности возникшего исключения и понятия не имеет, что с ним делать. Подобные ситуации должны как-то обрабатываться на уровне приложения. Для этого у Subscriber есть прекрасный метод onError(). Reactor вынуждает нас его переопределять и как-то реагировать на исключение, в противном случае мы будем получать UnsupportedOperationException при ошибках.


Уточнение

Если быть честным, то выкидывает он наследника UnsupportedOperationException — ErrorCallbackNotImplemented. Чтобы понять, что это действительно он, существует вспомогательный статический метод Errors.errorCallbackNotImplemented(Throwable t).


Философия try/catch


Что обычно делается внутри catch-блока в Java? Ну, не считая всеми любимых пустых catch-блоков.


  1. Static Fallback Value. Вернуть какое-то статическое значение по умолчанию:
    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       return DEFAULT_VALUE;
    }
  2. Fallback Method. Вызов альтернативного метода в случае ошибки:


    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       return loadValueFromCache();
    }

  3. Dynamic Fallback Value. Вернуть какое-то динамическое значение в зависимости от исключения:


    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       if (e instanceof TimeoutException) {
           return loadValueFromCache();
       }
       return DEFAULT_VALUE;
    }

  4. Catch and Rethrow. Обернуть в какое-то доменное исключение и пробросить исключение дальше:


    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       throw new BusinessException(e);
    }

  5. Log or React on the Side. Залогировать ошибку и пробросить исключение дальше:


    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       logger.error(e.getMessage(), e);
       throw e;
    }

  6. Using Resources and the Finally Block. Освобождение ресурсов в finally-блоке или с помощью try-with-resources.
    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       //do nothing
    } finally {
       cleanAllStuff();
    }

Приятная новость: всё это есть в Reactor в виде эквивалентных операторов.


Менее приятная новость: в случае ошибки ваша прекрасная последовательность данных всё равно завершится (terminal event), несмотря на оператора обработки ошибок.
Подобные операторы используются скорее для создания новой, резервной (fallback) последовательности на замену завершившейся.


Приведём пример:


Flux<String> s = Flux.range(1, 10)
   .map(v -> doSomethingDangerous(v))
   .map(v -> doSecondTransform(v));
s.subscribe(value -> System.out.println("RECEIVED " + value), error -> System.err.println("CAUGHT " + error));

Можно сравнить это с похожим блоком try / catch:


try {
   for (int i = 1; i < 11; i++) {
       String v1 = doSomethingDangerous(i);
       String v2 = doSecondTransform(v1);
       System.out.println("RECEIVED " + v2);
   }
} catch (Throwable t) {
   System.err.println("CAUGHT " + t);
}

Обратите внимание: for прерывается!


Ещё пример завершения последовательности в случае ошибки:


Flux<String> flux = Flux.interval(Duration.ofMillis(250))
   .map(input -> {
       if (input < 3) return "tick " + input;
       throw new RuntimeException("boom");
   })
   .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100);

На экране получим:


tick 0
tick 1
tick 2
Uh oh

Реализация try/catch


Static Fallback Value


Используя оператор onErrorReturn:


Flux.just(10)
   .map(this::doSomethingDangerous)
   .onErrorReturn("RECOVERED");

Можно добавить предикат, чтобы оператор выполнялся не для всех исключений:


Flux.just(10)
   .map(this::doSomethingDangerous)
   .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");

Fallback Method


Используя оператор onErrorResume,


Flux.just("key1", "key2")
   .flatMap(k -> callExternalService(k)) //загружаем данные извне
   .onErrorResume(e -> getFromCache(k)); //в случае ошибки берём из кеша

можно добавить предикат, чтобы оператор выполнялся не для всех исключений:


Flux.just("timeout1", "unknown", "key2")
   .flatMap(k -> callExternalService(k))
   .onErrorResume(TimeoutException.class, getFromCache(k))
   .onErrorResume((Predicate<Throwable>) error -> error instanceof UnknownKeyException, registerNewEntry(k, "DEFAULT"));

Аналогично:


Flux.just("timeout1", "unknown", "key2")
   .flatMap(k -> callExternalService(k))
   .onErrorResume(error -> {
       if (error instanceof TimeoutException)
           return getFromCache(k);
       else if (error instanceof UnknownKeyException) 
           return registerNewEntry(k, "DEFAULT");
       else
           return Flux.error(error);
   });

Dynamic Fallback Value


Всё тот же onErrorResume:


erroringFlux.onErrorResume(error -> Mono.just(
   myWrapper.fromError(error); //тут, в зависимости от ошибки, будут совершены разные действия
));

Catch and Rethrow


Можно сделать двумя способами. Первый — с оператором onErrorResume:


Flux.just("timeout1")
   .flatMap(k -> callExternalService(k))
   .onErrorResume(original -> Flux.error(
       new BusinessException("oops, SLA exceeded", original)
   );

И более лаконично — с помощью onErrorMap:


Flux.just("timeout1")
   .flatMap(k -> callExternalService(k))
   .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

Log or React on the Side


Добавить какой-то side effect (метрики, логирование) можно с помощью оператора doOnError


LongAdder failureStat = new LongAdder();
Flux<String> flux = Flux.just("unknown")
   .flatMap(k -> callExternalService(k))
   .doOnError(e -> {
       failureStat.increment();
       log("uh oh, falling back, service failed for key " + k);
   })
   .onErrorResume(e -> getFromCache(k));

Using Resources and the Finally Block


Итак, как же получить аналог try-with-resources или блок finally? На выручку нам приходит оператор Flux.using().


Для начала нужно ознакомиться с интерфейсом Disposable. Он заставляет нас реализовать метод dispose(). Вызов этого метода должен отменять или завершать какую-то задачу или последовательность. Вызовы метода должны быть идемпотентными. Использованные ресурсы должны быть освобождены.


AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
   @Override
   public void dispose() {
       isDisposed.set(true);
   }

   @Override
   public String toString() {
       return "DISPOSABLE";
   }
};

Flux<String> flux = Flux.using(
       () -> disposableInstance,  // генерация данных
       disposable -> Flux.just(disposable.toString()), //обработка
       Disposable::dispose  //освобождение ресурсов
);

Повторение | Retrying


При повторе (retry) наблюдается похожее поведение, оригинальная последовательность завершается (terminate event), мы повторно подписываемся (re-subscribing) на Flux.


Разберём на примере. Код


Flux.interval(Duration.ofMillis(250))
   .map(input -> {
       if (input < 3) return "tick " + input;
       throw new RuntimeException("boom");
   })
   .elapsed()
   .retry(1)
   .subscribe(System.out::println, System.err::println);

Thread.sleep(2100);

выведет


259,tick 0
249,tick 1
251,tick 2
506,tick 0
248,tick 1
253,tick 2
java.lang.RuntimeException: boom

Более сложная логика повторов доступна с использованием оператора retryWhen().


Заключение


Надеюсь, этой небольшой заметке удалось пролить свет на некоторые особенности работы Reactor.


Подведём итоги:


  • контекстом выполнения можно манипулировать с помощью операторов publishOn, subscribeOn и Schedulers;
  • для обработки исключительных ситуаций есть множество операторов на все случаи жизни;
  • посылание terminate signal приводит к завершению оригинальной «последовательности»;
  • для освобождения ресурсов используется интерфейс Dispose.

Спасибо за внимание!


По мотивам документации Reactor


Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.

Меня тут нет, но есть более достойные мужи, в т.ч. и контрибьюторы / мейнтейнеры.

Комментарии (4)


  1. cepro
    25.05.2018 00:30

    Огромное спасибо за статью! Замечательная подача тех. материала и литературный дар.
    Если есть возможность, пожалуйста, продолжайте!


  1. vkolotov
    26.05.2018 11:53

    Спасибо за статью. Было бы интересно почитать о том как отрефакторить существующий код под новый реактивный манер. Например вот интересно различные подходы конвертации итеративного метода (возвращающего список объектов) так чтобы он возвращал Flux. Должен ли этот новый метод создавать поток чтобы сабмитать новые объекты (hot stream), или же этот метод должен только оборачивать список объектов в Flux (и вызывать метод complete)?


    1. zealot_and_frenzy Автор
      26.05.2018 22:29

      Не до конца понял вопрос, но попробую ответить. Какого-то универсального рецепта нет, все упирается в данные. Если метод возвращает весь необходимый список объектов единовременно (не потоком) — вероятно это не Flux, а Mono. Hot Stream это скорее про какие-нибудь Event Listener на UI или системные ивенты, вебсокеты. Что-то, вызов чего мы не контролируем.


      1. vkolotov
        27.05.2018 11:53

        Вот конкретный пример. Есть некий итеративный метод который возвращает список найденных блютус устройств: List<DiscoveredDevice> getDiscoveredDevices(). Каждый вызов этого метода может возвратить разные устройства. Необходимо отрефакторить данный метод так чтобы он стал реактивным, а именно, чтоб он возвращал нескончаемый поток, так чтобы клиенты могли бы подписаться на этот поток, и соответственно, получать новые устройства. Первое что приходит в голову это — Flux<DiscoveredDevice> getDiscoveredDevicesStream(), где внутри этого метода создается поток (thread), и по мере обнаружения новых устройств, вызывался бы emitter.next(...). Однако кажется мне что создание потока не совсем верное решение, ибо в reactor'е есть столько различный способов работы с потоками, что может быть это можно как-то решить с помощью reactor'а? Кроме того, может быть лучше работу с потоком возложить на клиента этого метода? Спасибо.