В сети очень много русско- и англоязычных статей по Rx операторам retryWhen и repeatWhen.
Несмотря на это, очень часто встречаю нежелание их использовать (ввиду сложного синтаксиса и непонятных диаграмм).

Приведу несколько как можно с их помощью эффективно перезапускать участки цепи и делегировать обработку перезапусков при ошибках и завершениях потока.

В примерах будет Java код с лямбдами (Retrolamda), но переписать его на Kotlin или чистую Java не составит труда.

Императивный способ перезапуска цепи


Предположим, мы используем Retrofit и загрузку начинаем в методе load(). Repository.getSomething() возвращает Single<Something>().

@NonNull
private Subscription loadingSubscription = Subscriptions.unsubscribed();

private void load() {
    subscription.unsubscribe();
    subscription = repository
             .getSomething()
             .subscribe(result -> {}, err -> {});
}

private void update() {
    load();
}

Из какого-нибудь листенера обновлений (e.g. PullToRefreshView) мы вызываем метод update(), который, в свою очередь, вызовет метод load(), где с нуля будет создана подписка.

Предлагаю ко вниманию вариант использования более реактивного, на мой взгляд, способа с вышеупомянутым оператором repeatWhen().

Реактивный способ перезапуска цепи — repeatWhen


Создадим объект PublishSubject updateSubject и передадим в оператор лямбду
repeatHandler -> repeatHandler.flatMap(nothing -> updateSubject.asObservable())

@NonNull
private final PublishSubject<Void> updateSubject = PublishSubject.create();

private void load() {
    repository
            .getSomething()
            .repeatWhen(repeatHandler ->
                                repeatHandler.flatMap(nothing -> updateSubject.asObservable()))
            .subscribe(result -> {}, err -> {});
}

Теперь для обновления загруженных данных нужно заэмитить null в updateSubject.

private void update() {
    updateSubject.onNext(null);
}

Нужно помнить, что работает такой реактивный способ только с Single, который вызывает onComplete() сразу после эмита единственного элемента (будет работать и с Observable, но только после завершения потока).

Реактивный способ обработки ошибок retryWhen


Подобным образом можно обрабатывать и ошибки. Предположим, у пользователя пропала сеть, что приведет к ошибке и вызову onError() внутри Single, который возвращается методом getUser().

В этот момент можно показать пользователю диалог с текстом «Проверьте соединение», а по нажатию кнопки OK вызвать метод retry().

@NonNull
private final PublishSubject<Void> retrySubject = PublishSubject.create();

private void load() {
    repository
            .getSomething()
            .doOnError(err -> showConnectionDialog())
            .retryWhen(retryHandler -> retryHandler.flatMap(nothing -> retrySubject.asObservable()))
            .subscribe(result -> {}, err -> {});
}

private void retry() {
    retrySubject.onNext(null);
}

По вызову retrySubject.onNext(null) вся цепочка выше retryWhen() переподпишется к источнику getUser(), и повторит запрос.

При таком подходе важно помнить, что doOnError() должен находиться выше в цепочке, чем retryWhen(), поскольку последний «поглощает» ошибки до эмита repeatHandler'а.

В данном конкретном случае выигрыша по производительности не будет, а кода стало даже чуть больше, но эти примеры помогут начать мыслить реактивными паттернами.

В следующем, бессовестно притянутом за уши, примере, в методе load() мы объединяем два источника оператором combineLatest.

Первый источник — repository.getSomething() загружает что-то из сети, второй, localStorage.fetchSomethingReallyHuge(), загружает что-то тяжелое из локального хранилища.

public void load() {
    Observable.combineLatest(repository.getSomething(),
                             localStorage.fetchSomethingReallyHuge(),
                             (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

При обработке ошибки императивным способом, вызывая load() на каждую ошибку, мы будем заново подписываться на оба источника, что, в данном примере, абсолютно ненужно. При сетевой ошибке, второй источник успешно заэмитит данные, ошибка произойдет только в первом. В этом случае императивный способ будет еще и медленней.

Посмотрим, как будет выглядеть реактивный способ.


public void load() {
    Observable.combineLatest(
            repository.getSomething()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               err -> retrySubject.asObservable())),
            localStorage.fetchSomethingReallyHuge()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               nothing -> retrySubject.asObservable())),
            (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

Прелесть такого подхода в том, что лямбда, переданная в оператор retryWhen() исполняется только после ошибки внутри источника, соответственно, если «ошибется» только один из источников, то и переподписка произойдет только на него, а оставшаяся цепочка ниже будет ожидать переисполнения.

А если ошибка произойдет внутри обоих источников, то один и тот же retryHandler сработает в двух местах.

Делегирование обработки ошибок


Следующим шагом можно делегировать обработку повторов некоему RetryManager. Перед этим еще можно немного подготовиться к переезду на Rx2 и убрать из наших потоков null объекты, которые запрещены в Rx2. Для этого можно создать класс:

public class RetryEvent {
}

Без ничего. Позже туда можно будет добавлять разные флаги, но это другая история. Интерфейс RetryManager может выглядеть как-то так:

interface RetryManager {

    Observable<RetryEvent> observeRetries(@NonNull Throwable error);

}

Реализация может проверять ошибки, показывать диалоги, снэкбар, устанавливать бесшумный таймаут — всё, что душе угодно. И слушать коллбэки от всех этих UI компонентов, чтобы в последствии заэмитить RetryEvent в наш retryHandler.

Предыдущий пример с использованием такого RetryManager будет выглядеть вот так:

//pass this through constructor, DI or use singleton (but please don't)
private final RetryManager retryManager;

public void load() {
    Observable.combineLatest(
            repository.getSomething()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               err -> retryManager.observeRetries())),
            localStorage.fetchSomethingReallyHuge()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               nothing -> retryManager.observeRetries())),
            (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}


Таким нехитрым образом обработка повторов при ошибках делегирована сторонней сущности, которую можно передавать как зависимость.

Надеюсь, эти примеры окажутся кому-то полезны и соблазнят попробовать repeatWhen() и retryWhen() в своих проектах.
Поделиться с друзьями
-->

Комментарии (12)


  1. Artem_007
    20.04.2017 00:04

    Какую литературу/статьи можно почитать, чтобы также познать «реактивный подход»? )
    Во многих случаях мое использование Rx скатывается в императивный подход и при чтении подобных статей у меня возникает вопрос: «А что и тут так надо?»


    1. yatsinar
      20.04.2017 00:25
      +1

      Сложно посоветовать какую-то литературу, потому-что большинство материалов, вроде «RxJava for Android App Development» — K. Matt Dupree, покрывает только самый базовый уровень (хотя именно эту 30 страничную брошюру стоит пробежать, там популярная реактивная обёртка на SearchView продемонстрирована).
      Самым эффективным способом будет идти по официальной документации, брать каждый оператор и гуглить отдельно по нему небольшие статьи: «rx debounce usage android», «rx compose transformers usage android».
      Очень много годноты находится на хабре и медиуме.
      Позже, когда документация, диаграммы из неё и десятки прочитанных примеров врежутся в память, мозг сам начнет выдавать тонны (часто нерабочих) способов переделать собственный код и реализовать какую-то привычную штуку через Rx )


    1. xGromMx
      20.04.2017 14:46

      можете глянуть тут http://xgrommx.github.io/rx-book/content/resources/articles/index.html Если у кого-то есть время может сгруппировать по языкам


    1. mairos
      20.04.2017 15:22

      От языка зависит, для Java очень хорошо Reactive Programming with RxJava Creating Asynchronous, Event-Based Applications


    1. Valeroncho
      20.04.2017 15:41

      Могу порекомендовать книгу «Нуркевич, Кристенсен: Реактивное программирование с использованием RxJava». В ней достаточно подробно описаны операторы + приведены примеры по их использованию.


  1. a15199732
    23.04.2017 22:53

    Надо уточнить, что примеры работают только для версии 2.


    1. yatsinar
      23.04.2017 22:55

      Имеете ввиду Rx2? Как раз наоборот, работают на Rx1, для миграции на Rx2 нужно не эмитить нуллы, а создавать класс RetryEvent, о чем я написал в середине статьи. Подозреваю, что-то еще придется менять, так как примеры чисто на первой версии писал.


      1. a15199732
        24.04.2017 16:18

        В версии 1.2.9 (видимо, крайняя на данный момент) у Single нет метода repeatWhen, только retryWhen.


        1. yatsinar
          24.04.2017 17:32

          Для Single можно вызвать .toObservable() — результат будет такой же.


          1. a15199732
            24.04.2017 19:23

            И еще
            repeatWhen(repeatHandler -> repeatHandler.flatMap(nothing -> updateSubject.asObservable()))
            можно сократить до
            repeatWhen(updateSubject)


            1. yatsinar
              24.04.2017 23:56

              Нeльзя, во-пeрвых, там сигнатура Func1<Ovservable, Observable>, поэтому как минимум repeatWhen(error -> repearSubject.asObservable()).
              Во-вторых, спецификация обязывать возвращать подстрим данного notificationHandler'а. Пeрeданный туда сторонний статичный стрим работать нe будeт.


              1. a15199732
                25.04.2017 13:43

                Можно-можно, у меня это работает.
                PublishSubject наследник Observable, asObservable() делает явное приведение типа, но проканает и неявное.
                А спецификация обязывает возвращать Observable. Другое дело, что handler служит как бы сигналом, что очередная последовательность закончилась, и по нему можно синхронизировать эмитирование из возвращаемого Observable. Но в этом случае можно использовать BehaviorSubject, который отдает последний эмит новому подписчику. Тогда даже если вызов update() произойдет пока getSomething() еще не вернул результат, то replayWhen(updateSubject) сразу приведет к переподписке на getSomething.