Реактивное программирование — очень модный тренд в мобильной разработке на данный момент. Если говорить про android разработку, то реактивность представлена в основном библиотекой RxJava.
В сети все больше статей, обучающих видео, презентаций, записей с конференций, в которых рассказывается про данную тематику. Но подавляющее большинство представленного материала содержит теоретические аспекты и весьма тривиальные примеры, на которых разработчик не может оценить реальный профит от использования Rx. А ведь так хочется увидеть реальный и сложный пример из жизни, который очень наглядно покажет всю мощь, красоту и простоту реактивного программирования.
Поэтому данная статья будет посвящена подробному разбору реального и довольно таки сложного примера. И я очень надеюсь, что после прочтения, вы по-настоящему откроете для себя мир реактивного программирования.


Немного о теории


В начале я все-таки хотел бы уделить время для теории, так как при рассмотрении примера вы уже должны более-менее четко себе представлять, что такое Observable, Subscriber, Subscription, операторы Rx и что значат диаграммы для каждого оператора. Как я писал выше, материалов сейчас хватает. Поэтому приведу список ссылок на самые важные и интересные из них.

Самый главный ресурс — это wiki по RX
Особое бы внимание обратил я на этот пункт
В нем перечислены все ссылки на довольно хорошие и качественные статьи.
Лично мне для вступления нравится статья
В ней автор, по-моему мнению, пытается донести до слушателя основную мысль — нужно мыслить не объектами, а Потоками (Stream, не Thread! Многопоточность в данном случае не при чем). Понимание всего процесса Rx, именно как взаимодействие потоков, существенно облегчает дальнейшую работу.
Также для вступления есть хорошая презентация
Блог компании Futurice. Одна из статей
Блог, содержащий много полезных статей, посвященных Rx и не только.
Некоторые из статей — Part 1, Part 2, Part 3, Part 4, Don't break the chain, Loading data from multiple sources with RxJava
Что-то уже есть и на русском:
1. Переводы статей Grokking RxJava Part 1, Part 2, Part 3, Part 4
2. Самостоятельная статья
Если вы используете в своем проекте RxJava, то без RetroLambda дальнейшее ваше существование не имеет смысла :)
Хорошая статья по лямбде

И немало практики


Ну а теперь рассмотрим реальный случай из нашей не простой разработческой жизни.
Нам нужно было получить список выписок (это модель StatementRUR) за конкретный период времени. Скажем за месяц.
Что у нас есть? Запрос (метод statementRURList) с параметрами offset и limit, а также параметром StatementListParameters, в котором мы задаем фильтр (с такой-то даты по такую-то) и порядок сортировки. То есть получение списка выписок за месяц становится уже несколько нетривиальной задачкой в реализации.
Давайте немножко упростим и представим, что метод statementRURList возвращает сразу List<StatementRUR>.
Тогда стандартный код будет выглядеть приблизительно так:
int i = 0;
List<StatementRUR> commonList = new ArrayList<>();
while (true) {
    List<StatementRUR> list = statementRURList(int i * limit, int limit, String accountId, int periodDaysCount, Date docInfoDocDate);
    commonList.addAll(list);
    if (list.size() < limit) {
        break;  
    }
    i++;
}

В результате получим полный список выписок за месяц.
Но имеем то мы дело с запросами, которые нужно выполнять не UI потоке, и которые нужно слать строго друг за другом, и обрабатывать ответы с сервера в таком же порядке, дабы не сбить сортировку. А еще нужно корректно сложить все результаты, ведь могут же возникнуть проблемы с многопоточностью. И еще помним о корректной обработке ошибок.
С помощью асинхронности про реализацию подобного алгоритма «просто, красиво и надежно» можно забыть. А вот RxJava — это то, что нам нужно!

Уделим время описанию способа связи с сервером и моделей, с которыми будем мы работать.
Вообще для связи с сервером мы используем замечательную библиотеку RetroFit. Еще более замечательное в ней то, что она взаимодействует с Rx. Таким образом наш запрос в сеть на получение списка выписок выглядит следующим образом:
Observable<ResponseApiModel<List<StatementRUR>>> statementRURList
(int offset, int limit, String accountId, int periodDaysCount, Date docInfoDocDate);

, где ResponseApiModel — это единая модель, по которой сервер возвращает ответ на любой наш запрос.
Ниже привожу полный код данной модели:
public class ResponseApiModel<T> {

    @SerializedName("result")
    private T result;
    @SerializedName("errors")
    private List<ErrorResponseApiModel> errors;
    @SerializedName("state")
    private Object state;

    public ResponseApiModel(T result, List<ErrorResponseApiModel> errors) {
        this.result = result;
        this.errors = errors;
    }

    public ResponseApiModel(T result, List<ErrorResponseApiModel> errors, Object state) {
        this.result = result;
        this.errors = errors;
        this.state = state;
    }
    public ResponseApiModel(List<ErrorResponseApiModel> errors, Object state) {
        this.errors = errors;
        this.state = state;
    }

    /**
     * if result == null && errors != null -> throw new ResponseAPIException!
     * @return result field
     */
    public T getResult() {
        if (result == null && errors != null) {
            throw new ResponseAPIException(errors);
        }
        return result;
    }

    public List<ErrorResponseApiModel> getErrors() {
        return errors;
    }

    public Object getState() {
        return state;
    }
}

Для нас наибольший интерес представляет поле T result, в котором содержатся запрашиваемые данные в случае успешного ответа сервера. Если же сервер возвращает ошибку в поле errors, то при попытке получения результата в методе T getResult() выкидывается исключение ResponseAPIException, которое потом перехватывается Subscriber.onError(Throwable e)
Параметры String accountId, int periodDaysCount, Date docInfoDocDate потом преобразуются в упомянутый выше StatementListParameters, но это уже детали.
В результате сервер в случае успеха возвращает нам List<StatementRUR> (поле T result).

Ну а теперь самое интересное! Как это реализовать в Rx?
Ниже приводится подробная диаграмма и алгоритм работы.
Для диаграммы следующие допущения: на сервере 160 выписок, LIMIT = 50



  1. Для начала нужно запустить бесконечный цикл, который после каждой итерации будет инкрементировать свой счетчик (переменная i в коде выше).
    Для этой цели существует оператор range. В программном виде это будет так:
    Observable.range(0, Integer.MAX_VALUE - 1)
    

    Данный Observable «выпускает» инкрементирующееся каждый раз значение. То есть на выходе получается последовательность чисел: 0, 1, 2, 3...Integer.MAX_VALUE - 1.
  2. Используя значение счетчика, нужно делать запросы в сеть — метод statementRURList. Но при этом каждый следующий запрос должен слаться после получения ответа на предыдущий. Это необходимо для корректной работы RetroFit (чтобы не завалить его большим количество одновременных запросов) и для соблюдения сортировки.
    На помощь приходит оператор concatMap. Как видно из диаграммы, данный оператор создает новый Observable, на вход которому поступают элементы с предыдущего Observable, и который «излучает» новые элементы. Но главное отличие concatMap от похожего с виду flatMap заключается в том, что новый Observable соблюдает такую же последовательность элементов, что и предыдущий. То есть, если на вход оператору поступают нулевой, первый и второй элементы, новый Observable выпустит переработанные значения этих элементов в точно таком же порядке.
    Но лучше один раз увидеть :) Внимание на диаграмму, а именно на первые два Потока («горизонтальные стрелки», Stream)

    И программный код теперь:
    Observable
            // get All statements from current date for periodDaysCount (with offset, limit)
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(new Func1<Integer, Observable<ResponseApiModel<List<StatementRUR>>>>() {
                @Override
                public Observable<ResponseApiModel<List<StatementRUR>>> call(Integer increment) {
                    return statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate);
                }
            }
    );
    

    Как-то многовато кода, не находите? Может используем lambda? И тогда получим:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate));
    

    Вот так гораздо лучше :)
  3. Новый Observable «излучает» элементы ResponseApiModel<List<StatementRUR>>. Но нам то нужен список выписок или, в данном контексте, поле T result модели ResponseApiModel. Для получения поля result в модели есть метод T getResult(). Таким образом необходимо преобразовать элементы, выпущенные последним Observable. Для этого существует оператор map.
    Диаграмма. Третий поток.

    Программный код:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(new Func1<ResponseApiModel<List<StatementRUR>>, List<StatementRUR>>() {
                @Override
                public List<StatementRUR> call(ResponseApiModel<List<StatementRUR>> listResponseApiModel) {
                    return listResponseApiModel.getResult();
                }
            }
    );
    

    И с RetroLambda
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult);
    

  4. Теперь у нас есть бесконечный цикл отправки запросов на сервер (с последовательно увеличивающимся offset) и приема в точно такой же последовательности ответов. И наша задача на данном этапе — остановить Землю цикл.
    Как уже писалось выше, если в ответе количество выписок меньше значения limit, значит слать больше запросы не имеет смысла.
    Воспользуемся оператором takeUntil, который задает условие прекращения получения новых элементов. При достижении данного условия, все вышестоящие observable также прекращают свою работу.
    Что под этим подразумевается? Взглянем на диаграмму. Четвертый поток.

    Код:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult)
            .takeUntil(new Func1<List<StatementRUR>, Boolean>() {
                @Override
                public Boolean call(List<StatementRUR> list) {
                    return list.size() < LIMIT;
                }
           }
    );
    

    Код с RetroLambda:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult)
            .takeUntil(list -> list.size() < LIMIT);
    

  5. На данном этапе необходимо объединить полученные результаты и выдать полноценный список выписок за месяц с сортировкой по дате. Текущий наш observable «выпускает» элементы, которые представляют собой списки выписок (каждый список — это ответ на запрос на сервер). Эти списки нужно объединить. В этом помогут оператор toList, который объединит все «выпущенные» элементы в единый список, то есть в итоге мы получим список списков выписок, и оператор map, который преобразует список списков выписок в необходимый нам список выписок.
    Диаграмма. Пятый и шестой поток.

    Код. Позвольте сразу писать с RetroLambda:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult)
            .takeUntil(list -> list.size() < LIMIT)
            .toList()
            .map(this::safeMerge);
    
    private <T> List<T> safeMerge(List<List<T>> lists) {
        List<T> list = new ArrayList<>();
        for (List<T> statementOperationRURList : lists) {
            list.addAll(statementOperationRURList);
        }
        return list;
    }
    

    Те, кто уже с Rx «не на Вы» могут возразить мне, что для аккумулирования «выпущенных» элементов существует специальный оператор scan. Зачем же городить огород из toList и map? Дело в том, что, если первый же запрос вернет нам пустой список (и тогда этот запрос будет единственным), scan выдаст ошибку. И обойти ее можно только таким, несколько окольным, путем.


Скажите, ну, как вам код? Ну ведь правда просто, красиво и надежно!
А еще представьте такую ситуацию. Получили мы список выписок. А теперь еще дополнительно надо:
  1. Пройтись по каждой выписке
  2. По каждой выписке получить список Операций (StatementOperationRUR). Причем получаем мы с помощью запроса
    Observable<ResponseApiModel<List<StatementOperationRUR>>> statementOperationRURList(String id, int offset, int limit);
    

    , где параметр String id — это id Выписки, по которой запрашивается список операций.
    Таким образом, как и для выписок, чтобы получить список операций, нам заранее неизвестно, сколько нужно слать запросов на сервер из-за параметров offset, limit
  3. Сложить полученные операции в единый список

То есть, если нам нужен список операций за месяц, причем отсортированный, необходимо выполнить минимум 31 запрос (минимум один запрос на получение выписки и минимум 30 запросов на получение списка операций по каждой выписке), а точнее 31 цикл отправки запросов, прерывающихся по определенному условию. И ведь это все надо как-то соединить.
Но с RxJava… ну вы понимаете :)
Внимание на код.

private Observable<List<StatementRUR>> getAllRURStatements(String accountId, Date docInfoDocDate, int periodDaysCount) {
    return Observable
            // get All statements from current date for periodDaysCount (with offset, limit)
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult)
            .takeUntil(list -> list.size() < LIMIT)
            // merge all results from statement requests in one List
            .toList()
            .map(this::safeMerge);
}

// get All Operations from concrete statement (with offset, limit)
private Observable<List<StatementOperationRUR>> getOperationsFromRURStatement(StatementRUR statementRUR) {
    return Observable.range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementOperationRURList(statementRUR.getId(), LIMIT * increment, LIMIT))
            .map(ResponseApiModel::getResult)
            .takeUntil(list -> list.size() < LIMIT)
            // merge all results from operations requests in one List
            .toList()
            .map(this::safeMerge);
}

public <List<StatementOperationRUR>> getAllRUROperations(String accountId, Date docInfoDocDate, int periodDaysCount) {
   return
        // get all statements
        getAllRURStatements(accountId, docInfoDocDate, daysLimit)
        // from Statement list to each item emitting
        .concatMap(Observable::from)
        // get All Operations from all statement
        .concatMap(this::getOperationsFromRURStatement)
        // merge all results from operation requests in one List
        .toList()
        .map(this::safeMerge);
}


Вроде тоже ничего сложного уже нет. В первых двух методах наблюдается дублирование кода, но я оставил его для большей понятности.

Я постарался максимально подробно объяснить первый пример, так как он, по-моему мнению, самый сложный в построении. Я потратил немало сил и времени, чтобы написать такой простой с виду код. По крайней мере в сети я не видел подобных решений.
Второй пример, по сути, есть результат использования первого примера плюс пару простых операторов. Но зато он демонстрирует, насколько удобна RX в плане конструирования сложных алгоритмов. Rx и есть конструктор.

С удовольствием жду комментариев, замечаний и предложений!

Комментарии (8)


  1. 7voprosov
    22.09.2015 23:55

    Те, кто уже с Rx «не на Вы» могут возразить мне, что для аккумулирования «выпущенных» элементов существует специальный оператор scan.

    Не очень понятно почему вы пишите про scan, т.к по описанию вашей задаче кажется, что частичные суммы вам не нужны. Возможно, здесь уместнее говорить про reduce (который конечно является частным случаем для scan)
    Зачем же городить огород из toList и map? Дело в том, что, если первый же запрос вернет нам пустой список (и тогда этот запрос будет единственным), scan выдаст ошибку

    То же не совсем понятно, почему должна возникать ошибка, т.к здравая логика подсказывает (доки данный случай не обговаривают), что аккумулятор примет просто значение первого элемента (в данном случаи вашего пустого списка) и ни чего особенного не произойдет. Проверочный код в «лоб» работает:

    List<String> empty = new ArrayList<String>();
    
    Observable.just(empty).scan((accum, nextList) -> {
      accum.addAll(nextList);
      return accum;
    }).subscribe(list -> {
      System.out.println("Should be empty list: " + list.size());
    });
    

    Я потратил немало сил и времени, чтобы написать такой простой с виду код.

    В данном случае так же не понятно, почему не взять и просто заиспользовать ваш код из самого первого примера (характер ваших запросов говорит о их связанности, а значит все они все равно должны будут исполняться в одном потоке), как то так:

    Future<List<StatementRUR>> future = executor.submit(
      (Callable<List<StatementRUR>>) () -> {
        int i = 0;
        List<StatementRUR> commonList = new ArrayList<>();
        while (true) {
          // Retrofit должен иметь простой сихронный доступ
            List<StatementRUR> list = 
              statementRURList(int i * limit, int limit, String accountId,
                               int periodDaysCount, Date docInfoDocDate);
            commonList.addAll(list);
            if (list.size() < limit) {
              break;
            }
            i++;
        }
        return commonList;
      }
    );
    


    1. xoxol_89
      23.09.2015 11:10

      Добрый день! Спасибо за замечания.
      Про reduce согласен. Он лучше подходит.
      Про scan, я точно не помню уже, при каких конкретно условиях он падал (или выдавал исключение). По-моему, при условии, если в вышестоящем observable, только один «выпущенный» элемент. Нужно вспомнить будет)
      Про «потратил силы» я скорее имел в виду, как это реализовать в Rx, потому что подобного не нашел.
      Замечание про то, что можно использовать мой первый пример и отталкиваться от него, так как все необходимо будет выполнять только в одном фоновом потоке, отчасти согласен. В принципе можно. Но тут встает вопрос, что по циклу вы вот так запросы не сможете посылать. RetroFit ругнется и выкинет нас. Если использовать свой кастомный клиент, то все зависит от реализации очереди. И без пробрасывания колбэков не обойтись. А вот если использовать библиотеку Volley, которая возвращает RequestFuture, уже можно будет поиграться.
      Но не стоит забывать про то, что код то в Rx выглядит проще. Уж Вы то со мной согласитесь) Плюс мы довольно легко сможем подключить, если необходимо, еще дополнительные операции и не переживать, что что-то отвалиться. Ну и обработка ошибок. В Rx она тоже удобнее, нежели в асинхронном коде.


  1. 7voprosov
    23.09.2015 13:09

    Но тут встает вопрос, что по циклу вы вот так запросы не сможете посылать. RetroFit ругнется и выкинет нас.

    Опять таки непонятно почему, на главной странице retrofit-а, написано:

    Each Call from the created GitHubService can make a synchronous or asynchronous HTTP request to the remote webserver.

    Так, что кажется, что проблем вообще ни каких не будет (исполнение идет в фиче) и колбэки так же не понадобится.


    1. xoxol_89
      23.09.2015 13:35

      Да, действительно, у RetroFit есть такое возвращаемое значение, как Call.
      RetroFit ругнется, если вы вместо concatMap подставите flatMap, но это несколько другая история.
      Но можно к примеру добавить параллельные вызовы в БД для записи или чтения, и уже добавляется настоящая асинхронность, и цикл синхронных вызовов уже не сработает так, как хотелось бы.


  1. atetc
    25.09.2015 07:40
    +1

    Кстати существует отдельный чат для обсуждения Rx в Android gitter.im/rus-speaking/android-rx


  1. lNevermore
    25.09.2015 11:22
    +1

    Хорошая статья, спасибо.

    Согласен с 7voprosov про reduce, еще хотел отметить, что в примере у вас после concatMap идет map, и по сути map не нужен, можно вставить этот геттер прямо в concatMap, убрав лишний оверхэд и сделав код более коротким и более читабельным (на вкус и цвет, разумеется).


    1. xoxol_89
      25.09.2015 14:28

      Согласен по map. Спасибо)


  1. xoxol_89
    01.10.2015 14:46

    По поводу reduce, scan.
    Вот какая ошибка может вылетать:

    java.util.NoSuchElementException: Sequence contains no elements at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:131) at rx.internal.operators.OperatorTakeLastOne$ParentSubscriber.onCompleted(OperatorTakeLastOne.java:106) at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:123) at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:99) at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65) at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:171) at rx.internal.operators.OperatorConcat$ConcatSubscriber.onCompleted(OperatorConcat.java:154) at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44) at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:99) at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65) at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:171) at rx.internal.operators.OperatorConcat$ConcatSubscriber.onCompleted(OperatorConcat.java:154) at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44) at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44) at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:125) at rx.internal.operators.OperatorTakeLastOne$ParentSubscriber.emit(OperatorTakeLastOne.java:158) at rx.internal.operators.OperatorTakeLastOne$ParentSubscriber.onCompleted(OperatorTakeLastOne.java:124) at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:123) at rx.internal.operators.OperatorTakeUntilPredicate$ParentSubscriber.onNext(OperatorTakeUntilPredicate.java:57) at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) at rx.observers.SerializedObserver.onNext(SerializedObserver.java:159) at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95) at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:206) at retrofit.RxSupport$2.run(RxSupport.java:56) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:442) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:305) at java.util.concurrent.FutureTask.run(FutureTask.java:137) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1076) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:569) at java.lang.Thread.run(Thread.java:856)

    Собственно поэтому используются комбинация операторов toList и map