Доброго времени суток! Работая над одним проектом, обнаружил, что через связку retrofit2 и retrofit2 adapter-rxjava нельзя реализовать batch loading в одном потоке.
Приведу пример. Имеем описание retrofit-сервиса:
Загрузка одного пакета:
Для загрузки всех сущностей с сервера, нам придется каждый раз создавать новый observable для загрузки очередного пакета. Кроме того, как запихнуть все эти observable'ы в один поток, представляется крайне сложным.
Для решения это проблемы, предлагаю достаточно простой подход.
Пример использования:
Таким образом, мы получили достаточно простой и эффективный способ грузить пакеты через ретрофит в связке с RxJava. Гуру RxJava, наверняка, смогут предложить более правильный подход, в соответствии со всеми концепциями реактивного программирования и RxJava, однако, такой способ, однозначно, будет понятен всем, кто хоть чуть-чуть знаком с rx.
P.S. Пинайте сильно, мой первый пост.
UPD А вот так это делается без костылей, как предложил werktone.
Приведу пример. Имеем описание retrofit-сервиса:
interface Api {
@GET("query.json")
Observable<List<SomeEntityServerView>> getAll(
@Query("first") int first,
@Query("max") int batchSize);
}
Загрузка одного пакета:
service.getAll(0,20).map(list -> ...).observeOn(...).subscribe(...);
Для загрузки всех сущностей с сервера, нам придется каждый раз создавать новый observable для загрузки очередного пакета. Кроме того, как запихнуть все эти observable'ы в один поток, представляется крайне сложным.
Для решения это проблемы, предлагаю достаточно простой подход.
public class BatchLoadingUtils {
/**
* @param batchLoaderFactory - метод создания обсервабла по номеру первого элемента
* @param batchSize - размер пакета
*/
public static <T> Observable<List<T>> create(Func1<Integer, Observable<List<T>>> batchLoaderFactory, int batchSize) {
//Здесь храним номер первого элемента в пакете
AtomicInteger first = new AtomicInteger(0);
//Сюда будем этот номер отправлять. Соответственно, первый номер - 0. При желании, можно вынести его в параметры метода
BehaviorSubject<Integer> subject = BehaviorSubject.create(0);
return subject
//Превращаем смешение в observable
.flatMap(batchLoaderFactory::call)
.doOnNext(ts -> {
if (ts.size() == batchSize) {
//Если загрузили ожидаемое количество элементов, грузим дальше
subject.onNext(first.addAndGet(batchSize));
} else {
//В противном - завершаем работу
subject.onCompleted();
}
});
}
}
Пример использования:
final int batchSize = 10;
BatchLoader
.create(
first -> retrofitService.getAll(first, batchSize),
batchSize
)
.observeOn(Schedulers.computation())
.flatMapIterable(list -> list)
.map(TimeEntryServerView::buildTimeEntry)
.buffer(batchSize)
.subscribe(...);
Таким образом, мы получили достаточно простой и эффективный способ грузить пакеты через ретрофит в связке с RxJava. Гуру RxJava, наверняка, смогут предложить более правильный подход, в соответствии со всеми концепциями реактивного программирования и RxJava, однако, такой способ, однозначно, будет понятен всем, кто хоть чуть-чуть знаком с rx.
P.S. Пинайте сильно, мой первый пост.
UPD А вот так это делается без костылей, как предложил werktone.
Observable.just(0)
.repeat()
.scan((accumulator, item) -> accumulator + batchSize)
.concatMap(first -> service.getAll(first, batchSize))
.takeWhile(response -> response.size() == batchSize)
.subscribe(...);
Поделиться с друзьями
Комментарии (5)
astr
22.10.2016 11:21Observable.range(1, 20).concatMap(i -> service.getAll(i, 20));
alexander-shustanov
22.10.2016 11:24Такой способ подойдет, если нам нужно загрузить известное количество пачек, и, в этом случае, врядли можно придумать что-то более подходящее. Однако, если нужно грузить все что есть, подойдет только мой вариант.
werktone
22.10.2016 15:10Observable.just(0) .repeat() .scan((accumulator, item) -> accumulator + batchSize) .concatMap(first -> service.getAll(first, batchSize)) .takeWhile(response -> response.size() == batchSize)
punksta
Сомневаюсь, что в общем случае запросы будут идти именно в одном потоке. Последовательно — да, а выбор потока тут за scheduller-м, который используется в call-адаптере.
alexander-shustanov
Здесь некоторая двусмысленность у слова «поток». Имеется ввиду поток данных, а не поток java. А так, конечно, где будут выполняться запросы решит scheduler.