Применение
RxJava отлично заменяет Streams API из Java 8 на более ранних версиях Java. Так как Android Java 8 поддерживается далеко не с 4.0, Rx будет оптимальным решением. В статье RxJava рассматривается именно с этого ракурса, так как, по-моему, он наиболее понятный и по-настоящему реактивное приложение под Android с помощью чистой Rx реализовать сложно.
Emitter
Всем нам знаком паттерн Iterator.
interface Iterator<T> {
T next();
boolean hasNext();
}
За интерфейсом скрывается какой-нибудь источник данных, причём совершенно не важно, какой. Iterator полностью скрывает все детали реализации, предоставляя всего два метода:
next — получить следующий элемент
hasNext — узнать, есть ли ещё данные в источнике
У этого паттерна есть одна особенность: потребитель запрашивает данные и ждёт («зависает»), пока источник не выдаст их. Поэтому в качестве источника обычно выступает конечная, часто заранее сформированная коллекция.
Проведём небольшой рефакторинг.
interface Iterator<T> {
T getNext();
boolean isComplete();
}
Думаю, вы уже поняли, к чему я. Интерфейс Emitter из RxJava (для потребителей он дублируется в Observer (Subscriber в RxJava 1)):
interface Emitter<T> {
void onNext(T value);
void onComplete();
void onError(Throwable error);
}
Он похож на Iterator, но работает в обратную сторону: источник сообщает потребителю о том, что появились новые данные.
Это позволяет разрешить все проблемы с многопоточностью на стороне источника и, например, если вы проектируете UI, то вы сможете рассчитывать на то, что весь код, отвечающий за графический интерфейс — последовательный. Невероятно удобно. Прощайте, каллбэки! Скучать не буду.
Аналогия с Iterator взята из [1]
Sources
Теперь немного о самих источниках. Они бывают множества типов: Observable, Single, Maybe… И все они похожи на капусту (и монады, но это не так важно).
Потому что создав один источник, можно заворачивать его в другой источник, который можно ещё раз завернуть в ещё один источник и так до OutOfMemory. (Но так как обычный источник весит меньше 100 байт, скорее, пока заряд не кончится.)
Давайте завернём в источник ответ на тот самый вопрос.
Observable.just(42)
Как мы знаем, получение ответа — довольно долгая операция. Поэтому завернём в источник, который выполнит вычисления в специальном потоке:
Observable.just(42)
.subscribeOn(computation())
А ещё мы хотим, чтобы приложение не упало при ответе. Заворачиваем в источник, который вернёт ответ в главном потоке:
Observable.just(42)
.subscribeOn(computation())
.observeOn(mainThread())
И, наконец, запускаем:
Observable.just(42)
.subscribeOn(computation())
.observeOn(mainThread())
.subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer answer) {
System.out.print(answer);
}
@Override public void onComplete() {}
@Override public void onError(Throwable e) {}
});
В консоль вывелся ответ, но что же произошло?
Метод subscribe определён в Observable. Он делает проверки и подготовку, а затем вызывает метод subscribeActual, который уже по-разному определён для разных источников.
В нашем случае метод subscribe вызвал метод subscribeActual у ObservableObserveOn, который вызывает метод subscribe завёрнутого в него источника, уточнив, в какой поток нужно вернуть результат.
В ObservableObserveOn лежит ObservableSubscribeOn. Его subscribeActual запускает subscribe завёрнутого в заданном потоке.
И, наконец, в ObservableSubscribeOn завёрнут ObservableJust, который просто выдаёт в onNext своё значение.
Естественно, просто с числом не интересно. Поэтому вот источник, который получает список товаров и узнаёт для них цены. Цены можно получать только по 20 штук (у InAppBilling API такое же ограничение).
> github.com/a-dminator/rx-products-and-prices
Этот пример создан для демонстрации принципа работы, а не для использования в реальных проектах.
В RxJava огромное количество разных реализаций источников. Все они работают по одному принципу, а детали отлично описаны в документации. Поэтому не буду останавливаться на них.
Операции
Все операции над источниками делятся на 2 типа:
— Не терминальные возвращают новый источник, который завернул исходный
— Терминальные исполняют цепочку и получают данные (subscribe, map...)
И да, ничего не исполнится, пока не будет выполнена терминальная операция. Цепочка может сколько угодно лежать в памяти, не делая вообще ничего. И это хорошо, потому что если мы не получаем данные, то зачем их производить? (Ленивые вычисления без Haskell в комплекте!).
По аналогии со Streams API из [2]
Dispose (Unsubscribe в RxJava 1)
Исполнение цепочки можно прервать. Делается это вызовом dispose() у DisposableObserver (unsubscribe() у Subscriber в RxJava 1).
После этого RxJava прекратит исполнение цепочек, отпишет всех Observer'ов и вызовет iterrupt() у потоков, которые больше не нужны.
Так же можно узнать, не прервано ли исполнение из источников. Для этого у Emitter есть метод isDispose() (isUnsubscribe() для RxJava 1).
У этого есть логичная, но неприятная особенность: так как Observer отвечает за обработку ошибок, теперь все ошибки крашат приложение. Я пока не нашёл решения, о котором готов написать.
Заключение
RxJava:
— Позволяет легко компоновать запросы к сети, базе данных и т.д; организуя их асинхронное выполнение. Это означает, что ваши пользователи получат более быстрое и отзывчивое приложение.
— Не содержит в себе никакой магии. Только составление и исполнение цепочек источников.
— (Для меня) Решает больше проблем, чем создаёт!
Всем спасибо.
[1] Видео
[2] Видео
Комментарии (32)
Jukobob
19.12.2016 16:01+1Я долго боялся использовать RxJava в production
Скажу честно, мы вот полгода как выпустили продукт, который 100% Kotlin и на «Реактивной тяге».
Почему вы боитесь Rx??? Вы просто не умеете его готовить!!! Он вкусный, надо просто научиться.
PS. Никогда. Повторяюсь Никогда, не используйте конструкции «Зачем мне использовать новую технологию, если я могу писать стандартными методами».xGromMx
19.12.2016 16:23+1А еще он очень простой) Правда могут быть у новичков запарки с Subjects ну и с подогревом или охлаждением Observable) Как кстати Kotlin в бою?
vba
19.12.2016 16:50Kotlin для Android хорош, я долго ждал выхода. До этого пробовал писать под Android на Scala и даже Groovy. В Scala убивало то что компиляция долгая и официальный джар нельзя использовать из-за размера, приходилось обходными путями. Ну а Groovy тоже мороки навалом. Порог вхождения в язык Kotlin почти нулевой, да и генерация из Java помогает очень. Впечатления только положительные, ждем версию 1.1.
KomarovI
20.12.2016 15:27И с Вами также смею не согласиться, rx далеко не простой инструмент, многие по нескольку лет осваивают его. Как минимум понимание проблемы упомянутых Ваши Subject-ов доходит не сразу до человека, начавшего свой «реактивный» путь
Jukobob
20.12.2016 17:18Котлин в бою превосходен. Код более удобочитаем и понятен. Ушли вечные проблемы Java с NPE. Скажу честно. Год как пишу на Kotlin, возвращаться не Java нет желания. Всем советую попробовать.
А у «Новичков» проблемы будут не только с Hot-Cold-Observable (Это лишь малая часть, которую легко выучить), а скорее с правильной архитектурой на RxJava
Bringoff
19.12.2016 20:25Никогда. Повторяюсь Никогда, не используйте конструкции «Зачем мне использовать новую технологию, если я могу писать стандартными методами».
Вы так говорите, будто это что-то плохое. Лучше, конечно, использовать в продакшене первую попавшуюся хайповую библиотечку и фреймворк. В крайности бросаться не надо, многие (да что там, большинство) приложения жили без Rx, живут и вполне себе проживут дальше. И странно их за это осуждать.
Jukobob
20.12.2016 17:23Скорее всего мы с вами просто о разных вещах подумали.
Допустим мы вошли в какой нибудь старый проект интерпрайза, где уже оверхед костылей и есть свой кодстайл. Естественно, внедрять в него новомодные штуки-дрюки не имеет смысла, так как вероятнее всего это приведет к новым костылям.
Однако, если мы начинаем новый проект и у нас есть свобода выбора технического стека, то почему бы сразу не начать строить проект с использованием тех же самых Di, Rx, DataBinding?
Я скорее имел ввиду, что если мы начинаем как раз новый проект, то имеет смысл пробовать новые технологии, о которых все говорят.
Bringoff
20.12.2016 17:55+2Di — это вообще не о библиотеках, а принцип разработки inversion of control. И реализовать этот принцип вполне можно без даггера того же. Rx — опять же вкусовщина. Если кто-то выберет lightweight stream api + bolts, или еще что-то типа async job queue, я его осуждать не буду. Data binding — интересная штука, но в ограниченных случаях, на полноценное mvvm в андроиде не легло у нас.
AlexeyGorovoy
20.12.2016 18:56Не то чтобы я спорил с вами, но вы не раскрыли чем обосновано ваше последнее утверждение.
adev_one
20.12.2016 19:10Всё же, не зря боялся. На тестировании всплыло, например, что на некоторых устройствах приложение падает при ?50 одновременных соединениях. Или то, что где-то InAppBilling API не цепляется к серверу, не выдавая никакой ошибки (даже timeout). А теперь на нём завязаны некоторые запросы.
Осторожность связна не столько с самим Rx, как бы он ни был хорош, сколько с совместимостью со всеми остальными технологиями, которые мы, порой вынужденно, используем
vba
19.12.2016 16:12+4Здесь вы ошибаетесь, проводя параллели со streams api. Не нужно представлять rxJava как библиотеку для java <v8. Между streams api и реактивными расширениями есть одна существенная разница, которая заключается в том что streams по сути построены на pull а observable построены на push. Результат таков что на streams можно "подписаться" только один раз а у observable может быть сколько угодно подписчиков.
По мне так главная мешанина с rxJava начинается с того что люди мешают многопоточный и асинхронные подходы. Просто в Java из за отсутствия машины состояний(state machine) асинхронизм построен поверх многопоточных библиотек.
В Kotlin 1.1, кстати, обещают ко-рутины построенные поверх машины состояний(state machine).
zagayevskiy
19.12.2016 18:20А зачем вы дублируете машину состояний (state machine)? И что это значит?
Просто в Java из за отсутствия машины состояний(state machine) асинхронизм построен поверх многопоточных библиотек.
vba
19.12.2016 18:25Я был не совсем уверен в переводе на русский язык как машина состояний. Проверил в википедии, называется абстрактный автомат. Ну после вашего комментария, мой уже не поправить.
adev_one
20.12.2016 19:52Согласен, что Rx не заканчивается на замене streams, но применение этого, скорее, про сервера и Scala. Я пока не видел кейсов под Android, в которых долгоиграющий Observable или Flowable приносит больше пользы, чем проблем из-за сложного жизненного цикла UI-компонентов (потребителей данных), который нужно обслуживать. Буду рад, если вы приведёте пример
anton9088
19.12.2016 20:16+1А как в RxJava решается проблема, когда запрос к сети завершился, когда Activity было свернуто? В Rx же так же в onResume идет подписка и в onPause отписка? И как сделать так чтобы не было повторного запроса, если он еще выполняется, при пересоздании Activity (речь не про поворот экрана, а про то что Activity можно закрыть кнопкой назад, а потом вернуться обратно).
kuchanov
20.12.2016 14:03Можно по разному. Вот, например, не идеальный вариант, но вполне рабочий. В синглтоне все операции идут, а фрагмент/активити лишь подписываются для получения последней полученной информации:
Используем RxJava и Retrofit на Android, учитывая поворот экрана
Jukobob
20.12.2016 17:15Дорогой мой друг, это решается построением человеческой архитектуры (допустим MVP) где при паузе происходит дроп View. Таким образом даже если Subscription завершит свою работу мы не получит ответ в пустой вью. Ну или отписку можно делать.
Это нужно для того, что если пользователь сворачивает активити, некоторые процессы должны продолжаться. Для повторной переподписки на presenter.
Извиняюсь за офтоп. Ушел чутка от темы)
shakagamii
20.12.2016 18:45Тут больше вопрос не к Rx, а к используемому шаблону проектирования.
Используя MVP, вы делаете любую реализацию какого-нить «кеша» в презентере. А потом просто опрашиваете презентер, если данные есть, берете готовые, если нет загружаете новые. Если загрузка в процессе, то просто ждете конца.
В Rx же так же в onResume идет подписка и в onPause отписка?
Нет, Rx не привязан к контексту активити.
AndroidNoob
20.12.2016 18:45Observable.just(42) .subscribeOn(computation())
Если не ошибаюсь с оператором just subscribeOn бесполезен. Код все равно выполнится на вызывающем потоке. Для того чтобы все работало как Вы хотите необходимо, например, еще обернуть в deferadev_one
20.12.2016 18:55Да, верно, вычисление самого числа произойдёт в основном потоке, но исполнение ObservableJust — в потоке для вычислений. Здесь я использовал just просто чтобы не усложнять пример, а уже на github выложил более сложный кейс
xGromMx
Сколько можно мусолить про Rx?) Хватит глянуть одного видео про это и все http://xgrommx.github.io/rx-book/content/resources/video/index.html (первое по списку)
KomarovI
Смею с Вами не согласиться — RxJava ( а в особенности — вторая ветка ) является очень даже глубокой темой для обсуждения, другое дело, что таких триалов как у автора в сети, равно как и на хабре, тысячи, а вот хороших комплексных юз-кейсов в продашне как раз-таки не хватает.