Я долго боялся использовать RxJava в production. Её назначение и принцип работы оставались для меня загадкой. Чтение исходного кода не добавляло ясности, а статьи только путали. Под катом попытка ответить на вопросы: «Какие задачи эта технология решает лучше аналогов?» и «Как это работает?» с помощью аналогий с классической Java и простых метафор.

image

Применение


RxJava отлично заменяет Streams API из Java 8 на более ранних версиях Java. Так как Android Java 8 поддерживается далеко не с 4.0, Rx будет оптимальным решением. В статье RxJava рассматривается именно с этого ракурса, так как, по-моему, он наиболее понятный и по-настоящему реактивное приложение под Android с помощью чистой Rx реализовать сложно.

Emitter


Всем нам знаком паттерн Iterator.

interface Iterator<T> {
    T next();
    boolean hasNext();
}

За интерфейсом скрывается какой-нибудь источник данных, причём совершенно не важно, какой. Iterator полностью скрывает все детали реализации, предоставляя всего два метода:

next — получить следующий элемент
hasNext — узнать, есть ли ещё данные в источнике

У этого паттерна есть одна особенность: потребитель запрашивает данные и ждёт («зависает»), пока источник не выдаст их. Поэтому в качестве источника обычно выступает конечная, часто заранее сформированная коллекция.

Проведём небольшой рефакторинг.

interface Iterator<T> {
    T getNext();
    boolean isComplete();
}

Думаю, вы уже поняли, к чему я. Интерфейс Emitter из RxJava (для потребителей он дублируется в Observer (Subscriber в RxJava 1)):

interface Emitter<T> {
    void onNext(T value);
    void onComplete();
    void onError(Throwable error);
}

Он похож на Iterator, но работает в обратную сторону: источник сообщает потребителю о том, что появились новые данные.

Это позволяет разрешить все проблемы с многопоточностью на стороне источника и, например, если вы проектируете UI, то вы сможете рассчитывать на то, что весь код, отвечающий за графический интерфейс — последовательный. Невероятно удобно. Прощайте, каллбэки! Скучать не буду.

Аналогия с Iterator взята из [1]

Sources


Теперь немного о самих источниках. Они бывают множества типов: Observable, Single, Maybe… И все они похожи на капусту (и монады, но это не так важно).

image

Потому что создав один источник, можно заворачивать его в другой источник, который можно ещё раз завернуть в ещё один источник и так до OutOfMemory. (Но так как обычный источник весит меньше 100 байт, скорее, пока заряд не кончится.)

Давайте завернём в источник ответ на тот самый вопрос.

Observable.just(42)

Как мы знаем, получение ответа — довольно долгая операция. Поэтому завернём в источник, который выполнит вычисления в специальном потоке:

Observable.just(42)
                .subscribeOn(computation())

А ещё мы хотим, чтобы приложение не упало при ответе. Заворачиваем в источник, который вернёт ответ в главном потоке:

Observable.just(42)
                .subscribeOn(computation())
                .observeOn(mainThread())

И, наконец, запускаем:

Observable.just(42)
                .subscribeOn(computation())
                .observeOn(mainThread())
                .subscribe(new DisposableObserver<Integer>() {
                    @Override
                    public void onNext(Integer answer) {
                        System.out.print(answer);
                    }
                    @Override public void onComplete() {}
                    @Override public void onError(Throwable e) {}
                });

В консоль вывелся ответ, но что же произошло?

Метод subscribe определён в Observable. Он делает проверки и подготовку, а затем вызывает метод subscribeActual, который уже по-разному определён для разных источников.

В нашем случае метод subscribe вызвал метод subscribeActual у ObservableObserveOn, который вызывает метод subscribe завёрнутого в него источника, уточнив, в какой поток нужно вернуть результат.

В ObservableObserveOn лежит ObservableSubscribeOn. Его subscribeActual запускает subscribe завёрнутого в заданном потоке.

И, наконец, в ObservableSubscribeOn завёрнут ObservableJust, который просто выдаёт в onNext своё значение.

Естественно, просто с числом не интересно. Поэтому вот источник, который получает список товаров и узнаёт для них цены. Цены можно получать только по 20 штук (у InAppBilling API такое же ограничение).

> github.com/a-dminator/rx-products-and-prices

Этот пример создан для демонстрации принципа работы, а не для использования в реальных проектах.

В RxJava огромное количество разных реализаций источников. Все они работают по одному принципу, а детали отлично описаны в документации. Поэтому не буду останавливаться на них.

Операции


Все операции над источниками делятся на 2 типа:

Не терминальные возвращают новый источник, который завернул исходный
Терминальные исполняют цепочку и получают данные (subscribe, map...)

И да, ничего не исполнится, пока не будет выполнена терминальная операция. Цепочка может сколько угодно лежать в памяти, не делая вообще ничего. И это хорошо, потому что если мы не получаем данные, то зачем их производить? (Ленивые вычисления без Haskell в комплекте!).

По аналогии со Streams API из [2]

Dispose (Unsubscribe в RxJava 1)


Исполнение цепочки можно прервать. Делается это вызовом dispose() у DisposableObserver (unsubscribe() у Subscriber в RxJava 1).

После этого RxJava прекратит исполнение цепочек, отпишет всех Observer'ов и вызовет iterrupt() у потоков, которые больше не нужны.

Так же можно узнать, не прервано ли исполнение из источников. Для этого у Emitter есть метод isDispose() (isUnsubscribe() для RxJava 1).

У этого есть логичная, но неприятная особенность: так как Observer отвечает за обработку ошибок, теперь все ошибки крашат приложение. Я пока не нашёл решения, о котором готов написать.

Заключение


RxJava:

— Позволяет легко компоновать запросы к сети, базе данных и т.д; организуя их асинхронное выполнение. Это означает, что ваши пользователи получат более быстрое и отзывчивое приложение.

— Не содержит в себе никакой магии. Только составление и исполнение цепочек источников.

— (Для меня) Решает больше проблем, чем создаёт!

Всем спасибо.

[1] Видео
[2] Видео
Поделиться с друзьями
-->

Комментарии (32)


  1. xGromMx
    19.12.2016 15:28
    +6

    Сколько можно мусолить про Rx?) Хватит глянуть одного видео про это и все http://xgrommx.github.io/rx-book/content/resources/video/index.html (первое по списку)


    1. KomarovI
      20.12.2016 15:26
      +2

      Смею с Вами не согласиться — RxJava ( а в особенности — вторая ветка ) является очень даже глубокой темой для обсуждения, другое дело, что таких триалов как у автора в сети, равно как и на хабре, тысячи, а вот хороших комплексных юз-кейсов в продашне как раз-таки не хватает.


  1. Jukobob
    19.12.2016 16:01
    +1

    Я долго боялся использовать RxJava в production


    Скажу честно, мы вот полгода как выпустили продукт, который 100% Kotlin и на «Реактивной тяге».

    Почему вы боитесь Rx??? Вы просто не умеете его готовить!!! Он вкусный, надо просто научиться.

    PS. Никогда. Повторяюсь Никогда, не используйте конструкции «Зачем мне использовать новую технологию, если я могу писать стандартными методами».


    1. xGromMx
      19.12.2016 16:23
      +1

      А еще он очень простой) Правда могут быть у новичков запарки с Subjects ну и с подогревом или охлаждением Observable) Как кстати Kotlin в бою?


      1. vba
        19.12.2016 16:50

        Kotlin для Android хорош, я долго ждал выхода. До этого пробовал писать под Android на Scala и даже Groovy. В Scala убивало то что компиляция долгая и официальный джар нельзя использовать из-за размера, приходилось обходными путями. Ну а Groovy тоже мороки навалом. Порог вхождения в язык Kotlin почти нулевой, да и генерация из Java помогает очень. Впечатления только положительные, ждем версию 1.1.


        1. xGromMx
          19.12.2016 16:57

          ну у скалы рантайм под андроид очень большой и иногда прогвард не справляется. А вот у Kotlin есть еще прикольная штука Anko


          1. vba
            19.12.2016 18:17

            Да Anko буквально сегодня для себя открыл, вечерком начну колупать на очередном проекте.


          1. Optik
            19.12.2016 19:29

            Начиная с 2.12 у скалы размеры сильно сократились.


            1. yarulan
              20.12.2016 18:56

              И минимальная версия джавы стала 8.


          1. Jukobob
            20.12.2016 17:25

            Попробуйте Kotlin Android Extensions


        1. xGromMx
          19.12.2016 16:58
          -1

          А вообще тут много чего) https://kotlin.link/


      1. KomarovI
        20.12.2016 15:27

        И с Вами также смею не согласиться, rx далеко не простой инструмент, многие по нескольку лет осваивают его. Как минимум понимание проблемы упомянутых Ваши Subject-ов доходит не сразу до человека, начавшего свой «реактивный» путь


      1. Jukobob
        20.12.2016 17:18

        Котлин в бою превосходен. Код более удобочитаем и понятен. Ушли вечные проблемы Java с NPE. Скажу честно. Год как пишу на Kotlin, возвращаться не Java нет желания. Всем советую попробовать.

        А у «Новичков» проблемы будут не только с Hot-Cold-Observable (Это лишь малая часть, которую легко выучить), а скорее с правильной архитектурой на RxJava


    1. Bringoff
      19.12.2016 20:25

      Никогда. Повторяюсь Никогда, не используйте конструкции «Зачем мне использовать новую технологию, если я могу писать стандартными методами».

      Вы так говорите, будто это что-то плохое. Лучше, конечно, использовать в продакшене первую попавшуюся хайповую библиотечку и фреймворк. В крайности бросаться не надо, многие (да что там, большинство) приложения жили без Rx, живут и вполне себе проживут дальше. И странно их за это осуждать.


      1. Jukobob
        20.12.2016 17:23

        Скорее всего мы с вами просто о разных вещах подумали.
        Допустим мы вошли в какой нибудь старый проект интерпрайза, где уже оверхед костылей и есть свой кодстайл. Естественно, внедрять в него новомодные штуки-дрюки не имеет смысла, так как вероятнее всего это приведет к новым костылям.
        Однако, если мы начинаем новый проект и у нас есть свобода выбора технического стека, то почему бы сразу не начать строить проект с использованием тех же самых Di, Rx, DataBinding?

        Я скорее имел ввиду, что если мы начинаем как раз новый проект, то имеет смысл пробовать новые технологии, о которых все говорят.


        1. Bringoff
          20.12.2016 17:55
          +2

          Di — это вообще не о библиотеках, а принцип разработки inversion of control. И реализовать этот принцип вполне можно без даггера того же. Rx — опять же вкусовщина. Если кто-то выберет lightweight stream api + bolts, или еще что-то типа async job queue, я его осуждать не буду. Data binding — интересная штука, но в ограниченных случаях, на полноценное mvvm в андроиде не легло у нас.


    1. AlexeyGorovoy
      20.12.2016 18:56

      Не то чтобы я спорил с вами, но вы не раскрыли чем обосновано ваше последнее утверждение.


    1. adev_one
      20.12.2016 19:10

      Всё же, не зря боялся. На тестировании всплыло, например, что на некоторых устройствах приложение падает при ?50 одновременных соединениях. Или то, что где-то InAppBilling API не цепляется к серверу, не выдавая никакой ошибки (даже timeout). А теперь на нём завязаны некоторые запросы.

      Осторожность связна не столько с самим Rx, как бы он ни был хорош, сколько с совместимостью со всеми остальными технологиями, которые мы, порой вынужденно, используем


  1. vba
    19.12.2016 16:12
    +4

    Здесь вы ошибаетесь, проводя параллели со streams api. Не нужно представлять rxJava как библиотеку для java <v8. Между streams api и реактивными расширениями есть одна существенная разница, которая заключается в том что streams по сути построены на pull а observable построены на push. Результат таков что на streams можно "подписаться" только один раз а у observable может быть сколько угодно подписчиков.


    По мне так главная мешанина с rxJava начинается с того что люди мешают многопоточный и асинхронные подходы. Просто в Java из за отсутствия машины состояний(state machine) асинхронизм построен поверх многопоточных библиотек.


    В Kotlin 1.1, кстати, обещают ко-рутины построенные поверх машины состояний(state machine).


    1. zagayevskiy
      19.12.2016 18:20

      А зачем вы дублируете машину состояний (state machine)? И что это значит?


      Просто в Java из за отсутствия машины состояний(state machine) асинхронизм построен поверх многопоточных библиотек.


      1. vba
        19.12.2016 18:25

        Я был не совсем уверен в переводе на русский язык как машина состояний. Проверил в википедии, называется абстрактный автомат. Ну после вашего комментария, мой уже не поправить.


    1. adev_one
      20.12.2016 19:52

      Согласен, что Rx не заканчивается на замене streams, но применение этого, скорее, про сервера и Scala. Я пока не видел кейсов под Android, в которых долгоиграющий Observable или Flowable приносит больше пользы, чем проблем из-за сложного жизненного цикла UI-компонентов (потребителей данных), который нужно обслуживать. Буду рад, если вы приведёте пример


      1. vba
        20.12.2016 20:03

        Я например был вдохновлен вот этим примером от команды Яндекса.


        1. adev_one
          20.12.2016 21:15

          Спасибо, действительно интересный кейс


  1. anton9088
    19.12.2016 20:16
    +1

    А как в RxJava решается проблема, когда запрос к сети завершился, когда Activity было свернуто? В Rx же так же в onResume идет подписка и в onPause отписка? И как сделать так чтобы не было повторного запроса, если он еще выполняется, при пересоздании Activity (речь не про поворот экрана, а про то что Activity можно закрыть кнопкой назад, а потом вернуться обратно).


    1. kuchanov
      20.12.2016 14:03

      Можно по разному. Вот, например, не идеальный вариант, но вполне рабочий. В синглтоне все операции идут, а фрагмент/активити лишь подписываются для получения последней полученной информации:


      Используем RxJava и Retrofit на Android, учитывая поворот экрана


    1. Jukobob
      20.12.2016 17:15

      Дорогой мой друг, это решается построением человеческой архитектуры (допустим MVP) где при паузе происходит дроп View. Таким образом даже если Subscription завершит свою работу мы не получит ответ в пустой вью. Ну или отписку можно делать.

      Это нужно для того, что если пользователь сворачивает активити, некоторые процессы должны продолжаться. Для повторной переподписки на presenter.

      Извиняюсь за офтоп. Ушел чутка от темы)


    1. shakagamii
      20.12.2016 18:45

      Тут больше вопрос не к Rx, а к используемому шаблону проектирования.
      Используя MVP, вы делаете любую реализацию какого-нить «кеша» в презентере. А потом просто опрашиваете презентер, если данные есть, берете готовые, если нет загружаете новые. Если загрузка в процессе, то просто ждете конца.

      В Rx же так же в onResume идет подписка и в onPause отписка?

      Нет, Rx не привязан к контексту активити.


      1. Jukobob
        26.12.2016 12:47

        Тут bind-unbind самого View имел ввиду


    1. Gustik
      20.12.2016 18:45

      Можно использовать паттерн MVP и делать запросы в presenter. Ну а чтобы повторно запрос не выполнялся, можно хранить состояние представления и перерисовывать при необходимости. Например как в moxy.


  1. AndroidNoob
    20.12.2016 18:45

    Observable.just(42)
                    .subscribeOn(computation())
    

    Если не ошибаюсь с оператором just subscribeOn бесполезен. Код все равно выполнится на вызывающем потоке. Для того чтобы все работало как Вы хотите необходимо, например, еще обернуть в defer


    1. adev_one
      20.12.2016 18:55

      Да, верно, вычисление самого числа произойдёт в основном потоке, но исполнение ObservableJust — в потоке для вычислений. Здесь я использовал just просто чтобы не усложнять пример, а уже на github выложил более сложный кейс