Всем привет, меня зовут Руслан, я Head of mobile development в одной международной компании. В нашей производственной практике достаточно много проектов используют для упрощенной работы с асинхронщиной фреймворк RxJava.

Обычно изучение RxJava в большинстве статей или онлайн-школ начинается со слов «Жил был Observable/Single/Flowable и мы решили на него подписаться».

После всего этого, как правило идёт пару слов про операторы, усиленный разбор отличий map от flatMap, concatMap, switchMap (мне сразу вспоминается среднестатистическое собеседование в какой-нибудь компании). Дальше идет что-то не очень внятное и совсем теоретическое про горячие источники и на этом всё. 

В реальности, начинающий Android разработчик либо начал с coroutines и flow, либо шлёпает RxJava цепочки по одному и тому же алгоритму:

auth(credentials)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ response ->
        Log.d("RESPONSE", response.toString())           
    }, { throwable ->
        Log.d("ERROR", throwable.localizedMessage)
    })

Красота да? У нас есть цепочка, которая что-то получает от бэкэнда, даже работает! Но, в действительности мы даже не представляем как она работает.

Начитавшись умных статей о том, что RxJava построена на основе паттерну Observer мы думаем - Ну вот метод auth(), это издатель, а subscribe это подписчик, subscribeOn - устанавливает стратегию на каком пуле потоков будет работать издатель, а observeOn - определяет на каком пуле потоков будет получать данные наш подписчик, которого мы бережно поместили внутрь метода subscribe.

На этом можно было бы заканчивать статью, но увы, не всё так, как кажется на самом деле. Нет, метод auth(), это действительно издатель, а subscribe - подписчик, с одной лишь оговоркой, ПОДПИСЧИК ЗАМЫКАЮЩИЙ ЦЕПОЧКУ (ну т.е. Вызов метода subscribe вернет некий Disposable). Отсюда назревает резонный вопрос, а что бывают какие-то ещё подписчики? Представляете, бывают!

Вот, век живи, век учись, каждый раз работая на проектах компании, где есть RxJava, я открываю для себя её по новому, бесконечный ящик пандоры. Окей, давайте ближе к сути.

Из курсов нам говорят, каждый оператор возвращает нам новый экземпляр источника с видоизмененными данными (если мы применяем какие-то операторы трансформации, комбинации, сортировки и т.д.), но нам забыли упомянуть одну важную вещь…

Каждый оператор это источник, внутри которого есть свой подписчик! Прикиньте? Чтоб в этом убедиться, давайте рассмотрим реализацию функции take под капотом:

//Original source from RxJava3 library
public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
    final long limit;
    public ObservableTake(ObservableSource<T> source, long limit) {
        super(source);
        this.limit = limit;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new TakeObserver<>(observer, limit));
    }

    static final class TakeObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;

        boolean done;

        Disposable upstream;

        long remaining;
        TakeObserver(Observer<? super T> actual, long limit) {
            this.downstream = actual;
            this.remaining = limit;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                upstream = d;
                if (remaining == 0) {
                    done = true;
                    d.dispose();
                    EmptyDisposable.complete(downstream);
                } else {
                    downstream.onSubscribe(this);
                }
            }
        }

        @Override
        public void onNext(T t) {
            if (!done && remaining-- > 0) {
                boolean stop = remaining == 0;
                downstream.onNext(t);
                if (stop) {
                    onComplete();
                }
            }
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }

            done = true;
            upstream.dispose();
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            if (!done) {
                done = true;
                upstream.dispose();
                downstream.onComplete();
            }
        }

        @Override
        public void dispose() {
            upstream.dispose();
        }

        @Override
        public boolean isDisposed() {
            return upstream.isDisposed();
        }
    }
}

Шок, правда? Т.е. У нас каждый оператор подписывается друг на друга в цепочке и к примеру наличие doOnTerminate{  exitProcess(0) }  будет давать разный результат в зависимости от его местоположения в цепочке:

Single.just(1)
    .subscribeOn(Schedulers.newThread())
    .doOnSuccess { logger.warning("First Single on: "+Thread.currentThread().name) }
    .observeOn(Schedulers.io())
    .doOnTerminate { exitProcess(0) }
    .doOnError { throwable -> logger.warning(throwable.localizedMessage) }
    .subscribe(
        { logger.warning("Root subscribe(): "+Thread.currentThread().name) },
        { throwable -> logger.warning(throwable.localizedMessage) }
    )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxCachedThreadScheduler-1

Вопрос - а где лог с Root subscribe(): "+Thread.currentThread().name - Это нормальное поведение, у нас ведь выполняется метод

doOnTerminate { exitProcess(0) }

который завершает программу, просто он выполняется по очереди со всеми операторами, а не когда корневая цепочка завершит своё выполнение. Убедиться в этом можно, если переставить его в самое начало Rx - цепочки, после выполнения такого алгоритма вы не увидите никаких логов, программа завершится до их появления.

Теперь, держите эту информацию в уме, потому что дальше начнутся странные странности, которые без понимания вот этого материала не объяснить.

Всё было бы так просто, если бы не было так сложно. Я приведу пример:

Single.just(1)
    .subscribeOn(Schedulers.newThread())
    .doOnSuccess { logger.warning("Current thread: "+Thread.currentThread().name) }
    .observeOn(Schedulers.computation())
    .doOnSuccess { logger.warning("Current thread after observeOn: "+Thread.currentThread().name) }
    .subscribeOn(Schedulers.io())
    .doOnError { throwable -> logger.warning(throwable.localizedMessage) }
    .subscribe(
        { logger.warning("Root subscribe(): "+Thread.currentThread().name) },
        { throwable -> logger.warning(throwable.localizedMessage) }
    )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxComputationThreadPool-1

WARNING: Root subscribe(): RxComputationThreadPool-1

Вполне реальная ситуация, которая может вызвать ступор после радужного subscribeOn.observeOn. Благо, в документации на гите RxJava об этом написано. Пишут, что нельзя больше одного раза в корневой цепочке вызвать subscribeOn, а вот observeOn можно вызывать сколько угодно. Правило да правило, вот и живи теперь с этим. Ладно, на самом деле subscribeOn можно вызвать сколько угодно раз, но во второстепенных цепочках, которые к примеру вызываются внутри оператора flatMap, но поведение в корневой цепочке будет максимально неожиданным:

Single.just(1)
    .subscribeOn(Schedulers.newThread())
    .doOnSuccess { logger.warning("Current thread: "+Thread.currentThread().name) }
    .observeOn(Schedulers.computation())
    .doOnSuccess { logger.warning("Current thread after observeOn: "+Thread.currentThread().name) }
    .flatMap {
        Single.just(2).subscribeOn(Schedulers.io())
    }
    .doOnError { throwable -> logger.warning(throwable.localizedMessage) }
    .subscribe(
        { logger.warning("Root subscribe(): "+Thread.currentThread().name) },
        { throwable -> logger.warning(throwable.localizedMessage) }
    )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxComputationThreadPool-1

WARNING: Root subscribe(): RxCachedThreadScheduler-1

Оказывается, в RxJava есть два ключевых понятия, характеризующих порядок работы цепочки - upstream и downstream.

Ниже на скрине я нарисую что такое upstream и downstream:

Смысл этих двух терминов в том, что подписка происходит вверх по течению upstream, а выброс данных вниз по течению downstream. Давайте заглянем под капот функции subscribeOn, интересно же, почему в случае без flatMap у нас поток не переключился второй раз через subscribeOn на IO пулл потоков:

Вот это поворот! Оказывается внутри функции subscribeOn мы делаем replace передаваемого экземпляра пула потоков и этот replace работает снизу вверх, проходя по КОРНЕВОЙ цепочке, тот вызов subscribeOn который будет самым первым сверху, тот и установит реальный последний примененный пул потоков выполнения, не зря же он называется subscribeOn, при подписке, upstream. Интересно, а что же тогда происходит с observeOn, почему его можно вызвать много раз? Всё просто, у observeOn под капотом тот же replace, но только сверху вниз (downstream), именно по этому он сменится столько раз, сколько мы захотим.

Вы ещё держите в уме, что операторы друг на друга подписываются? Теперь сможете ответить на вопрос, почему subscribeOn во второстепенной цепи меняет поведение корневой? Думаю очевидно.

Если вам понравилась моя статья, подписывайтесь на мой телеграм-канал.

Комментарии (9)


  1. Rusrst
    09.04.2023 17:16
    +1

    Я недавно работал с rx3 внутри java lib signar от Microsoft. Coroutines все же удобнее...

    Да и такое глубокое понимание мне кажется нужно уже на позиции повыше чем просто писать асинхронный код. И кстати - про upstream и downstream во всех более менее подробных гайдах рассказывают, но да, почему работает именно так никто не вдавался в подробности.


  1. IknowThatIknowNothing
    09.04.2023 17:16
    +2

    Про upstream и downstream под капотом было интересно, автору - спасибо за статью


    1. KritApplication Автор
      09.04.2023 17:16

      Рад, что вам понравилось! Буду стараться писать чаще о каких-либо глубоких процессах, не только сторонних либ, но и Android SDK


  1. domix32
    09.04.2023 17:16
    +3

    Не очень понимаю зачем вы чуть ле не в каждом абзаце вы восклицаете "Вот это поворот". Никогда такого не было и вот опять?


    1. Foxek
      09.04.2023 17:16

      Я вот тоже прочитал заголовок и решил, что что-то не очевидное. Оказалось, что почти все знают то, о чем, судя по заголовку кто-то "не подозревает". Это же довольно базовые вещи, если работаешь с Rx


      1. KritApplication Автор
        09.04.2023 17:16

        Для кого базовые, а для кого нет, думаю не стоит равнять по уровню компетенций аудиторию под себя


        1. Foxek
          09.04.2023 17:16
          +2

          С этим полностью согласен. Уровень разный, просто название не соответствует содержанию. Описанное не является тайной или чем то необычным, либо неожиданным. Не хотел никого обидеть)


    1. KritApplication Автор
      09.04.2023 17:16

      У вас конкретно к написанию претензия? Вот такой вот у меня стиль


      1. Foxek
        09.04.2023 17:16

        Ответил, к предыдущему комментарию. Обидеть не хотел, лишь констатирую факт)