Всем привет, меня зовут Алексей Агапитов и сегодня я хочу рассказать, как с
помощью такой библиотеки как RxJava можно легко обрабатывать множество
асинхронных процессов в вашем Android приложении.
Мы разберём, как создавать свои холодные и горячие последовательности, обратим
внимание на некоторые нюансы при использовании RxJava, а также рассмотрим
насколько мощными инструментами являются предоставляемые данной библиотекой
операторы.
Рассказывать обо всём я буду на примере приложения Яндекс.Недвижимость и его
главного экрана с картой.
Для начала посмотрим на экран и разберёмся, что на нём происходит и что нам
предстоит реализовать.
Прежде всего возникает взаимодействие с картой: человек может двигать карту
и на ней будут появляться точки с объявлениями, подходящими под его фильтры.
Точки могут быть единичными объявлениями, новостройками, домами и кластерами,
объединяющими множество объявлений. Заметим, что единичные объявления могут быть
помечены как просмотренные (этот флаг хранится локально на устройстве).
Сами фильтры меняются на другом экране, но их необходимо использовать при запросе
интересующих нас точек на карте.
Ещё одна составляющая запроса это географический объект, для которого мы ищем
объявления.
Данный элемент необходим, чтобы быстро включать/выключать поиск в этом объекте или в той области, которая сейчас открыта на карте.
Таким образом, точки на карте необходимо обновлять по каждому из перечисленных
действий (изменение карты, фильтров или геообъекта). Для краткости мы не будем
рассматривать возможность рисования объектов на карте, считаем что такие объекты
являются частным случаем геообъекта.
Помимо этого, у нас есть два процесса, происходящих в других потоках: получение точек от веб-API и проверка, какие из этих точек уже были просмотрены на данном устройстве (для этого мы обращаемся к базе данных).
Учитывая, что карта, фильтры и геообъект меняются быстрее и чаще, чем приходят ответы с точками от сервера, то необходимо использовать только самые свежие результаты и отбрасывать предыдущие.
Таким образом, нам необходимо реализовать экран, содержащий немалое количество
асинхронных процессов, зависящих друг от друга.
Сравнение RxJava с традиционным Android подходом
В традиционном для Android подходе, для наблюдения за каждым из рассмотренных
процессов мы бы использовали коллбэки. Когда происходит событие изменения
составляющих элементов на карте (например, карту подвинули), мы читаем остальные
составляющие и соединяем их в один запрос, который и выполняем.
При реализации данного подхода возникают некоторые сложности.
- Коллбэки плохо комбинируются друг с другом:
- сложнее читать код – сложнее понять связи коллбэков друг с другом, определить,
кто от кого зависит, коллбэки разнесены по коду и в нём сложнее ориентироваться; - теряется гибкость кода – имеется меньше возможностей для его
переиспользования, сложнее вносить изменения в существующие решения.
- сложнее читать код – сложнее понять связи коллбэков друг с другом, определить,
- Необходимо явно хранить дополнительное состояние, связанное с выполняемой
асинхронной операцией и её коллбэком. Чем больше таких переменных состояния,
тем больше вероятность допустить ошибку (например, при работе с несколькими потоками).
Мы выбрали библиотеку RxJava по следующим причинам:
- Наличие универсальной абстракции над асинхронными процессами любой природы
(событийная модель, многопоточная обработка) под названием Observable –
наблюдаемая последовательность; - Возможность изменять последовательности за счёт применения операторов и
большое количество полезных операторов; - Возможность комбинировать последовательности друг с другом;
- Уменьшение количества переменных состояния за счёт использования
последовательностей и операторов; - Стабильность и качество реализации библиотеки;
Эту библиотеку мы используем в приложении для самых разных целей – начиная с
фоновой загрузки и обработки данных и заканчивая обработкой множества
событий, происходящих в пользовательском интерфейсе.
Реализация
Посмотрим на примерах, как используется библиотека в нашем приложении.
Наблюдаем за изменениями карты
Сначала возьмём наблюдение за изменением состояния карты. Для этого мы используем следующую последовательность, которая сообщает о том, что координатные границы карты изменились:
public static Observable<BoundingBox> observeMapBoundingBox(final MapController mapController) {
return Observable.create(new Observable.OnSubscribe<BoundingBox>() {
@Override
public void call(final Subscriber<? super BoundingBox> subscriber) {
final OnMapListener listener = new OnMapListener() {
@Override
public void onMapActionEvent(MapEvent mapEvent) {
switch (mapEvent.getMsg()) {
case MapEvent.MSG_SCALE_END:
case MapEvent.MSG_SCROLL_END:
case MapEvent.MSG_ZOOM_END:
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(getViewportBoundingBox(mapController));
}
break;
}
}
};
mapController.addMapListener(listener);
//если от последовательности отписались - удалить этого слушателя
subscriber.add(Subscriptions.create(() -> {
mapController.removeMapListener(listener);
}));
}
});
}
В данной реализации две основные части:
- создание и добавление слушателя событий карты
- удаление данного слушателя, когда от последовательности отписались.
Обратите внимание, что мы создаём и регистрируем слушателя внутри OnSubscribe, то есть когда последовательность стала активна (на неё кто-то подписался).
Здесь мы имеем дело с классическим примером холодной последовательности – такой, которая выпускает новые элементы, пока на неё подписаны. Отличным примером реализации подобных последовательностей является библиотека RxBinding, позволяющая наблюдать за событиями в виджетах, присутствующих в стандартном API, а также support-библиотеке.
Наблюдаем за изменением фильтров
Теперь рассмотрим вторую составляющую запроса на точки – фильтры. Предположим, у нас есть класс, который хранит в себе текущие фильтры и предоставляет методы, для их обновления. И мы хотим наблюдать за изменением значения этого поля. Мы можем пойти тем же путём, что и в случае карты, добавив поле с наблюдателем изменений этого поля и уведомлением наблюдателя, когда поле изменилось. Но, наблюдателей у поля может быть много, а значит, либо надо хранить их массив, либо при создании последовательности использовать операторы share, publish + autoConnect
для того, чтобы отправлять событие сразу нескольким наблюдателям последовательности. Однако хочется сделать это прозрачно для потребителя, и здесь нам поможет такой класс из библиотеки RxJava, как Subject, на который мы переложим все перечисленные
обязанности.
Subject представляет собой последовательность, у которой одновременно может быть множество подписчиков, которые получают все данные и уведомления о её завершении или ошибке. При этом, работа с Subject происходит с помощью тех же методов, которыми обладают его подписчики: onNext
, onCompleted
, onError
. То есть, Subject сам является подписчиком, а значит при необходимости он может подписаться на другую последовательность и ретранслировать её всем своим подписчикам.
Посмотрим на примере, что это нам даёт:
public class FilterHolder {
private final PublishSubject<Filter> subject = PublishSubject.create();
private Filter current;
public Observable<Filter> observeChanges(boolean emitCurrentValue) {
return emitCurrentValue ? subject.startWith(current) : subject;
}
public void set(Filter filter) {
this.current = filter;
subject.onNext(filter);
}
}
Как видите, при установке нового значения мы отправляем его всем подписчикам. В данном случае мы используем PublishSubject, который отправляет свежеполученные данные, всем своим подписчикам. В принципе, можно было бы использовать ReplaySubject, который умеет хранить последние полученные данные и повторять их для тех подписчиков, которые подписались уже после получения этих данных. Но в таком случае нам пришлось бы поменять реализацию метода observeChanges
– вместо отправки текущего значения, мы бы его пропускали.
Подобным образом можно расширить уже существующие классы и добавить им
реактивные возможности.
Subject является примером горячей последовательности, то есть он остаётся активным и будет принимать/отправлять элементы, даже если на него никто не подписан. Главное – помнить, что Subject может принимать новые элементы последовательности в onNext
и рассылать их своим подписчикам до тех пор, пока у него не будет вызван onCompleted
или onError
.
Это важно, в тех ситуациях когда источник данных/событий бесконечный и не предполагается вызовов onCompleted
и onError
, поэтому вызов этих методов у Subject, рассылающего эти данные своим подписчикам, может привести к неожиданным эффектам.
Наблюдение за третьей составляющей запроса к API на точки – геообъектом аналогично
фильтрам и реализуется с помощью Subject.
Наконец, надо собрать эти три элемента вместе и отправить их в сетевом запросе.
Обращение к API
Для обращения к API мы используем известную библиотеку Retrofit, а все результаты сетевых вызовов представляются в виде Observable.
В итоге метод в сетевом слое будет выглядеть следующим образом:
public Observable<ClustersData> getClusters(MapBoundingBox box, Filter filter, GeoObject geo) {
//обращаемся к адаптеру API
}
Объединяем всё вместе
Вот так, мы объединяем все перечисленные асинхронные процессы:
Observable.combineLatest(
observeMapBoundingBox(mapController).debounce(300L, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()),
filterHolder.observeChanges(true),
observeGeoObject(true),
SearchRequest::clusters//собрать все составляющие в промежуточный объект
)
.switchMap(request -> networkHelper
.getClusters(request.boundingBox, request.filter, request.geoObject)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(handleErrorAction())//обрабатываем ошибку
.onErrorResumeNext(Observable.empty())//подавляем ошибку
)
.observeOn(Schedulers.computation())
//смотрим в БД, какие точки отметить как просмотренные
.map(this::processViewedClusters)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ClustersData>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//в нормальной ситуации ошибок не происходит, поэтому просто логируем
}
@Override
public void onNext(ClustersData clustersData) {
//показываем точки на карте
}
});
Мы используем оператор combineLatest чтобы следить за изменением каждого из трёх значений, и при изменении одного из них создаём объект описывающий
сетевой запрос.
Данный оператор работает следующим образом: он ждёт, пока каждая из переданных ему последовательностей предоставит по одному элементу и вызывает функцию, которая преобразует все эти элементы в какой-то другой объект. Далее при каждом появлении нового элемента в любой из последовательностей оператор снова вызывает эту функцию передавая ей это новое значение и последние значения остальных.
Таким образом, он очень похож, на zip, с той лишь разницей, что использует последние известные значения элементов, если последовательности не предоставили новые. Это удобно когда сочетаешь последовательности, выпускающие элементы с разной частотой, например изменение состояния карты, которое случается чаще, чем фильтры.
После того как объект сетевого запроса построен, мы переходим непосредственно к запросу. Для этого мы обращаемся к объекту, отвечающему за взаимодействие с API, и передаём ему собранные нами параметры. Здесь есть два интересных момента.
Но сначала рассмотрим оператор flatMap, который для каждого элемента исходной последовательности возвращает новую последовательность, а потом объединяет их все в одну результирующую последовательность.
Оператор switchMap работает так же, с той лишь разницей, что он отписывается от последовательности, полученной от предыдущего элемента, переключается на новую и ждёт результатов от неё.
Это необходимо потому, что сетевые запросы выполняются медленно, так что если, например, человек передвинул карту, то предыдущий запрос теряет актуальность и мы должны запросить новые точки.
Вторым моментом является подавление сетевых ошибок при помощи операторов doOnError
и onErrorResumeNext
(с помощью которого мы возвращаем пустую последовательность).
Это делается для того, чтобы последовательность карта / фильтры / геообъект -> сетевой запрос -> точки на карте не разрывалась, если один из запросов завершился (сетевой) ошибкой – ведь, в таком случае новые изменения карты не дадут никакого результата, а сетевые ошибки вполне могут происходить, и нам их надо обрабатывать.
Следующим шагом после получения точек на карте является определение тех из них, которые пользователь уже просматривал. Для этого делается запрос к базе данных, после которого у всех просмотренных точек проставляется соответствующий флаг. Поскольку это долгая операция, мы освобождаем сетевой планировщик и переключаем вычисления на computation: observeOn(Schedulers.computation())
. Для запроса к базе данных мы используем Cupboard и свои Rx обёртки поверх него, но в данном случае мы обошлись обычным синхронным методом, хотя можно было использовать метод, возвращающий Observable
.
Вы, наверное, уже заметили, что в последовательности наблюдения за изменением положения карты появился оператор debounce, который позволяет отбросить лишние элементы, если они все пришли в указанный интервал времени. Это нужно, чтобы не делать слишком часто запросы на сервер, пока пользователь рассматривает карту. По умолчанию этот оператор использует computation-планировщик, но, поскольку мы знаем, что наши события происходят в главном потоке, то можно переопределить его на планировщик для главного потока. Это позволяет избежать ненужного в данном месте переключения потоков, а также избавляет computation-планировщик от лишних задач (так как количество потоков в нём по умолчанию ограничено количеством ядер).
А теперь подведём итоги.
Небольшое количество кода.
Вся логика уместилась в одну последовательность, по которой понятно движение данных и логика их обработки.
Субъективно такой код кажется проще, чем если бы мы использовали коллбэки. Но тут надо оговориться, что для этого необходимо как минимум знание основ RxJava.
Более простая реализация
Всю работу по синхронизации и хранению промежуточного состояния асинхронных операций мы переложили на плечи библиотеки. Как следствие, мы храним минимум промежуточного состояния. Это снижает вероятность ошибок при работе с несколькими потоками и асинхронными процессами.
Кроме того, вместо того чтобы реализовывать обработку множества асинхронных процессов, мы оперируем потоками данных и реализуем непосредственно наши бизнес-задачи. При этом в нашу последовательность легко добавлять новые шаги обработки и менять существующие.
Ещё хочется упомянуть, что код с последовательностями проще тестировать, потому что
последовательность можно заменить на ту, которая необходима для тестирования (в случае с коллбэками это будет сложнее).
Например, можно заменить все последовательности, связанные с интерфейсом, на
предустановленные значения, используя оператор just.
Комментарии (22)
kashey
29.09.2016 21:20человек передвинул карту, то предыдущий запрос теряет актуальность и мы должны запросить новые точки.
Конечно же это не так :)shakagamii
29.09.2016 22:06Почему, можно узнать?
kashey
29.09.2016 22:38Потому что это карты, и совершенно нормальные две ситуации — человек сдвинул карту не очень сильно(и большая часть старых данных валидна), и человек сдвинул карту обратно (и надо бы использовать те данные что есть).
Это приводит к тому, что при сдвиге карты надо загрузить только дельту по сдвигу, дополнив, а не заменив, данные.
В общем в JS API этот момент немного самим API покрыт, под андроидом судя по всему — нет.
Так уж получилось — но Яндекс.Недвижка ну совсем не умеет работать с данными карты. А принцип там очень простой — при сдвиге карты данные (в уже видимой области) НЕ должны меняться.agapitov
30.09.2016 00:41Вы пишете про разные вещи, цитата относится к запросу, который ещё не успел выполниться на момент, когда подвинули карту и начали запрашивать точки из новой области.
Далее, чтобы избежать моргания пинов на карте, берётся пересечение множества уже загруженных точек с теми, которые только что получили. Таким образом, пины, которые вышли из области видимости, удаляются, которые появились — добавляются на карту.
А идея с загрузкой дельты вводит дополнительную сложность, поскольку надо считать эту дельту, объединять её результаты с текущими, это что касается клиента. С точки зрения сервера, не факт, что поиск в дельте будет быстрее, чем во всей области + с кластеризацией сложнее.kashey
30.09.2016 08:28+1Вот в том и дело — что дополнительную сложность не вводит, новые пины у вас появляются рядом со старыми, да и кластеризация при таких условиях может полностью менять картинку при малейших изменений данных.
Вот смотрели вы на краешек Москвы, и все данные были там. Потому что их много.
А потом подвинули на пиксель — и картинка полностью изменилась. Потому что данные обычно запрашивают с никим лимитом, и раньше этот лимит «тратился» на центр города.
У меня (к счастью? к сожалению?) нет возможности проверить как работает андроид приложение, но в вебе сейчас все с этим плохо, и всегда так и было – информация приходит не равномерно.
Сейчас есть два «хороших» решения проблемы — или загрузка данных дискретными «тайлами». В том числе там нет понятия «запрос, который еще не успел выполнится» — тайл в любом случае может оставаться актуальным, и пускай продолжает загружаться. Либо ограничения лимита через серверную кластеризацию, по тем же Z-кодам.
PS: Z-code, он же Morton code, может быть заменен Geocode или hilbert code. В общем 1D spatial index.
Опять же — если хранить данные в Z кодах, а запрашивать в тайлах — запрос превращается в поиск в 1D интервале и начинает работать чуть чуть быстрее.
У меня нет ссылки на «не матан», а на понятное обьяснение задачи, но есть видео.
Ну и конечно же все эти мозги без особых проблем спрячутся за фасадом функции getClusters, а сам пример с RxJS очень понравился.agapitov
30.09.2016 12:17В таком случае, я советую вам сначала посмотреть Андроид приложение, потому что карта там отличается от десктопной. И пины там ведут себя как я описал выше.
agapitov
30.09.2016 12:25Гифка кратко, но показывает как это выглядит.
kashey
30.09.2016 12:37При горизонтальном драге перестроения быть не должно. На гифке есть «хорошая» догрузка при драге направо, и «плохая» при драге обратно. В том числе кластеризация не «устойчивая».
Когда я похожую проблему решал много лет назад для схожего по тематике сайта gdeetotdom – мне первое решение завернули сразу именно из-за «раздражающих анимашек» неустойчивой кластеризации.agapitov
30.09.2016 12:56Да, увидел о чём вы. Мне кажется это единичный случай, потому что я не могу это воспроизвести.
kashey
30.09.2016 13:01На более не равномерных данных, например на обзорных масштабах, когда виден и город (густо) и область(пусто), или когда количество обьявлений упрется в лимит — эффект будет более заметен.
Evgenij_Popovich
30.09.2016 10:17+3В обучающем материале, думаю, стоит упомянуть, что использование Observable.create считается продвинутой и нерекомендованной практикой, из-за возможности легко наделать себе кучу проблем. Для случая, описанного в статье, лучше использовать fromEmitter
Цитата <a https://github.com/ReactiveX/RxJava/wiki/Creating-Observables">отсюда
create(?) — advanced use only! create an Observable from scratch by means of a function, consider fromEmitter instead
fromEmitter() — create safe, backpressure-enabled, unsubscription-supporting Observable via a function and push events.
Буквально на неделе в чате полдня выясняли, почему у человека после возникновения ошибки половина цепочки вызовов продолжала работать (до первого flatMap, как потом выяснилось), а другая половина нет. Также не работал retry. Причиной оказалось некорректное использование Observable.create, сначала был пропущена а потом не туда вставлена проверка subscriber.isUnsubscribed.agapitov
30.09.2016 12:34Согласен,
fromEmitter
выглядит проще и надёжнее.
Сам его до этого не использовал, потому что в тех редких местах где использовалObservable.create
проблем с backpressure не было. А в других использовалObservable.SyncOnSubscribe
иObservable.AsyncOnSubscribe
.
genoxide
30.09.2016 19:58Большое спасибо за то, что обратили на это внимание, это и правда постоянно упускают из виду в уроках. Вернее, не обращают на это достаточного внимания.
Ого, еще совсем недавно он назывался .fromAsync() xD И все еще Experimental, почему-то.
А как насчет AsyncOnSubscribe? Юзали?agapitov
30.09.2016 20:08ИМХО,
fromEmitter
для более простых случаев.AsynOnSubscribe
упрощает написание собственного источника, поддерживающего Backpressure и умеющего выдавать запрашиваемое количество элементов.
Evgenij_Popovich
03.10.2016 18:50А как насчет AsyncOnSubscribe? Юзали?
К сожалению, не доводилось попробовать.
k0ber
30.09.2016 12:35Можно ли было внутри
FilterHolder
'a использоватьBehaviorSubject
вместоPublishSubject
'а и сохранения последнего элемента последовательности "вручную"?agapitov
30.09.2016 12:41В принципе можно,
поменяв как работает observeChanges
. У нас ещё есть синхронное получение значения, поэтому сделали так по привычке (хотя сейчас посмотрел, чтоBehaviorSubject
имеет методgetValue
).
nikis
02.10.2016 19:13+1AndroidSchedulers.mainThread() использует Message.sendDelayed внутри, что может привести к NPE, когда получателя события уже нет, о чем говорит в своем докладе инженер из Square. Вы с такой проблемой не сталкивались? Если да, то как решали?
Artem_zin
07.10.2016 02:43+1Данный доклад
абсолютноочень misleading.
В чем проблема
Message.sendDelayed()
? Как будто у нас есть другие способы передать управление в main поток из других потоков. От того, что они передвинули переход в main поток чуть повыше в бизнес логике, смысл не поменялся, в каком-то месте идет передача управления из какого-то потока черезHandler
и отрисовка данных. Не хотите NPE — не делайте рейс кондишенов :)
К NPE вы можете прийти только если вы сами обнулите ссылки, с которыми работаете в коллбеках, до того как отпишитесь от
Subscription
, при таком коде у вас будет рейс кондишен. То есть это чисто логическая ошибка при написании кода, а не "подарок" от Android Framework.
Магии никакой нет, GC/VM за вас ссылки не обнуляет, если хотите какие-то ссылки обнулить руками (в 99% это ошибка в дизайне, тк у нас рантайм с GC), то делайте это вместе с
subscription.unsubscribe()
(порядок не важен, тк как правило, вы делаете это в том же потоке, в котором потом будет работать коллбек, но если сложно про это думать, то чуть ниже написано более общее правило).
Железобетонное правило: делайте
subscription.unsubscribe()
как можно раньше (до обнулления ссылок, etc), тогда проблем не будет.
nikitosgleb
Стандартные Loaders проигрывают лишь в том, что не стали мэйнстримом и их не хвалит Jake Warton ;)
agapitov
Стандартные Loaders проигрывают тем, что у них нет такого большого количества операторов/фич, как у RxJava :-)