Доброго времени суток! Работая над одним проектом, обнаружил, что через связку retrofit2 и retrofit2 adapter-rxjava нельзя реализовать batch loading в одном потоке.

Приведу пример. Имеем описание retrofit-сервиса:

interface Api {
    @GET("query.json")
    Observable<List<SomeEntityServerView>> getAll(
        @Query("first") int first, 
        @Query("max") int batchSize);
}

Загрузка одного пакета:

service.getAll(0,20).map(list -> ...).observeOn(...).subscribe(...);

Для загрузки всех сущностей с сервера, нам придется каждый раз создавать новый observable для загрузки очередного пакета. Кроме того, как запихнуть все эти observable'ы в один поток, представляется крайне сложным.

Для решения это проблемы, предлагаю достаточно простой подход.

public class BatchLoadingUtils {

    /**
     * @param batchLoaderFactory - метод создания обсервабла по номеру первого элемента
     * @param batchSize - размер пакета
     */
    public static <T> Observable<List<T>> create(Func1<Integer, Observable<List<T>>> batchLoaderFactory, int batchSize) {
        //Здесь храним номер первого элемента в пакете
        AtomicInteger first = new AtomicInteger(0);
        //Сюда будем этот номер отправлять. Соответственно, первый номер - 0. При желании, можно вынести его в параметры метода
        BehaviorSubject<Integer> subject = BehaviorSubject.create(0);

        return subject
                //Превращаем смешение в observable
                .flatMap(batchLoaderFactory::call)
                .doOnNext(ts -> {
                    if (ts.size() == batchSize) {
                        //Если загрузили ожидаемое количество элементов, грузим дальше
                        subject.onNext(first.addAndGet(batchSize));
                    } else {
                        //В противном - завершаем работу
                        subject.onCompleted();
                    }
                });
    }
}

Пример использования:

final int batchSize = 10;
BatchLoader
    .create(
        first -> retrofitService.getAll(first, batchSize),
        batchSize
    )
    .observeOn(Schedulers.computation())
    .flatMapIterable(list -> list)
    .map(TimeEntryServerView::buildTimeEntry)
    .buffer(batchSize)
    .subscribe(...);

Таким образом, мы получили достаточно простой и эффективный способ грузить пакеты через ретрофит в связке с RxJava. Гуру RxJava, наверняка, смогут предложить более правильный подход, в соответствии со всеми концепциями реактивного программирования и RxJava, однако, такой способ, однозначно, будет понятен всем, кто хоть чуть-чуть знаком с rx.

P.S. Пинайте сильно, мой первый пост.

UPD А вот так это делается без костылей, как предложил werktone.

Observable.just(0)
        .repeat()
        .scan((accumulator, item) -> accumulator + batchSize)
        .concatMap(first -> service.getAll(first, batchSize))
        .takeWhile(response -> response.size() == batchSize)
        .subscribe(...);
Поделиться с друзьями
-->

Комментарии (5)


  1. punksta
    22.10.2016 08:19

    Сомневаюсь, что в общем случае запросы будут идти именно в одном потоке. Последовательно — да, а выбор потока тут за scheduller-м, который используется в call-адаптере.


    1. alexander-shustanov
      22.10.2016 11:21

      Здесь некоторая двусмысленность у слова «поток». Имеется ввиду поток данных, а не поток java. А так, конечно, где будут выполняться запросы решит scheduler.


  1. astr
    22.10.2016 11:21

    Observable.range(1, 20).concatMap(i -> service.getAll(i, 20));
    


    1. alexander-shustanov
      22.10.2016 11:24

      Такой способ подойдет, если нам нужно загрузить известное количество пачек, и, в этом случае, врядли можно придумать что-то более подходящее. Однако, если нужно грузить все что есть, подойдет только мой вариант.


      1. werktone
        22.10.2016 15:10

        Observable.just(0)
                .repeat()
                .scan((accumulator, item) -> accumulator + batchSize)
                .concatMap(first -> service.getAll(first, batchSize))
                .takeWhile(response -> response.size() == batchSize)