Здравствуйте все.

Мы продолжаем знакомить вас с нашим издательским поиском, и хотели прозондировать общественное мнение на тему RxJava.



В ближайшее время собираемся опубликовать более общий материал по реактивному программированию, которое нас также интересует не первый год, а сегодня предлагаем почитать о применении RxJava в Android, так как именно на этой платформе особенно важна динамичность и быстрота реагирования. Добро пожаловать под кат

В большинстве приложений Android мы реагируем на действия пользователя (щелчки, смахивание, т.д.), а тем временем в фоновом режиме идет какая-то другая работа (сетевая).

Оркестровка всех этих процессов – сложная задача, любой код рискует быстро превратиться в бесформенную кашу.
Например, не так просто послать по сети запрос к базе данных, а после его выполнения сразу начать одновременно выбирать и пользовательские сообщения, и настройки, а после завершения всей этой работы вывести приветственное сообщение.

Именно в таких случаях как нельзя кстати будет RxJava (ReactiveX) – библиотека, позволяющая соорганизовать множество действий, обусловленных определенными событиями в системе.

Работая с RxJava, можно будет забыть об обратных вызовах и адском управлении глобальным состоянием.

Почему?

Вернемся к нашему примеру:

послать по сети запрос к базе данных, а после его выполнения сразу начать одновременно выбирать и пользовательские сообщения, и настройки, а после завершения всей этой работы вывести приветственное сообщение.


Если разобрать эту ситуацию подробнее, найдем в ней три основных этапа, причем все три происходят в фоновом режиме:

  1. Выбрать пользователя из базы данных
  2. Одновременно выбрать пользовательские настройки и сообщения
  3. Скомбинировать результаты обоих запросов в один


Чтобы сделать то же самое в Java SE и Android, нам бы потребовалось:

  1. Сделать 3-4 различные AsyncTasks
  2. Создать семафор, который дождется завершения обоих запросов (по настройкам и по сообщениям)
  3. Реализовать поля на уровне объектов для хранения результатов


Уже понятно, что для этого требуется управлять состоянием, а также задействовать некоторые механизмы блокировки, существующие в Java.

Всего этого можно избежать, работая с RxJava (см. примеры ниже) – весь код выглядит как поток, расположенный в одном месте и строится на базе функциональной парадигмы (см. здесь).

Быстрый запуск в Android

Чтобы получить библиотеки, которые, скорее всего, понадобятся вам для проекта, вставьте в ваш файл build.gradle следующие строки:

compile 'io.reactivex:rxjava:1.1.0'
compile 'io.reactivex:rxjava-async-util:0.21.0'

compile 'io.reactivex:rxandroid:1.1.0'

compile 'com.jakewharton.rxbinding:rxbinding:0.3.0'

compile 'com.trello:rxlifecycle:0.4.0'
compile 'com.trello:rxlifecycle-components:0.4.0'


Таким образом будут включены:

  • RxJava – основная библиотека ReactiveX для Java.
  • RxAndroid — расширения RxJava для Android, которые помогут работать с потоками в Android и с Loopers.
  • RxBinding – привязки между RxJava и элементами пользовательского интерфейса Android, в частности, кнопками Buttons и текстовыми представлениями TextViews
  • RxJavaAsyncUtil – помогает склеивать код Callable и Future.


Пример

Начнем с примера:

Observable.just("1", "2")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });


Здесь мы создали Observable, который сгенерирует два элемента — 1 и 2.
Мы подписались на observable, и теперь, как только элемент будет получен, мы выведем его на экран.

Некоторые детали

Объект Observable – такая сущность, на которую можно подписаться, а затем принимать генерируемые Observable элементы. Они могут создаваться самыми разными способами. Однако Observable обычно не начинает генерировать элементы, пока вы на них не подпишетесь.

После того, как вы подпишетесь на observable, вы получите Subscription (подписку). Подписка будет принимать объекты, поступающие от observable, пока он сам не просигнализирует, что завершил работу (не поставит такую отметку), либо (в очень редких случаях) прием будет продолжаться бесконечно.

Более того, все эти действия будут выполняться в главном потоке.

Расширенный пример

Observable.from(fetchHttpNetworkContentFuture())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                throwable.printStackTrace();
            }
        });


Здесь наблюдаем кое-что новое:
  1. subscribeOn(Schedulers.io()) – благодаря этому методу Observable будет выполнять ожидание и вычисления в пуле потоков ThreadPool, предназначенном для ввода/вывода (Schedulers.io()).
  2. observeOn(AndroidSchedulers.mainThread()) – благодаря этому методу, результат действия подписчика будет выполнен в главном потоке Android. Это требуется в случаях, когда вам нужно что-то изменить в пользовательском интерфейсе Android.
  3. Во втором аргументе к .subscribe() появляется обработчик ошибок для операций с подпиской на случай, если что-то пойдет не так. Такая штука должна присутствовать почти всегда.


Управление сложным потоком

Помните сложный поток, описанный нами в самом начале?

Вот как он будет выглядеть с RxJava:

Observable.fromCallable(createNewUser())
        .subscribeOn(Schedulers.io())
        .flatMap(new Func1<User, Observable<Pair<Settings, List<Message>>>>() {
            @Override
            public Observable<Pair<Settings, List<Message>>> call(User user) {
                return Observable.zip(
                        Observable.from(fetchUserSettings(user)),
                        Observable.from(fetchUserMessages(user))
                        , new Func2<Settings, List<Message>, Pair<Settings, List<Message>>>() {
                            @Override
                            public Pair<Settings, List<Message>> call(Settings settings, List<Message> messages) {
                                return Pair.create(settings, messages);
                            }
                        });
            }
        })
        .doOnNext(new Action1<Pair<Settings, List<Message>>>() {
            @Override
            public void call(Pair<Settings, List<Message>> pair) {
                System.out.println("Received settings" + pair.first);
            }
        })
        .flatMap(new Func1<Pair<Settings, List<Message>>, Observable<Message>>() {
            @Override
            public Observable<Message> call(Pair<Settings, List<Message>> settingsListPair) {
                return Observable.from(settingsListPair.second);
            }
        })
        .subscribe(new Action1<Message>() {
            @Override
            public void call(Message message) {
                System.out.println("New message " + message);
            }
        });


В таком случае будет создан новый пользователь (createNewUser()), и на этапе его создания и возвращения результата в то же самое время продолжится выбор пользовательских сообщений (fetchUserMessages()) и пользовательских настроек (fetchUserSettings). Мы дождемся завершения обоих действий и возвратим скомбинированный результат (Pair.create()).

Не забывайте – все это происходит в отдельном потоке (в фоновом режиме).

Затем программа выведет на экран полученные результаты. Наконец, список сообщений будет приспособлен еще в один observable, который будет выводить сообщения поодиночке, а не целым списком, причем каждое сообщение будет появляться в окне терминала.

Функциональный подход

Работать с RxJava будет гораздо проще, если вы знакомы с функциональным программированием, в частности, с концепциями map и zip. Кроме того, в RxJava и ФП очень похоже выстраивается обобщенная логика.

Как создать собственный observable?

Если код становится в значительной степени завязан на RxJava (например, здесь ), то зачастую вам придется писать собственные observable, так, чтобы они укладывались в логику вашей программы.

Рассмотрим пример:

public Observable<String> customObservable() {
    return rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
        @Override
        public void call(final Subscriber<? super String> subscriber) {
            // Выполняется в фоновом режиме
            Scheduler.Worker inner = Schedulers.io().createWorker();
            subscriber.add(inner);

            inner.schedule(new Action0() {

                @Override
                public void call() {
                    try {
                        String fancyText = getJson();
                        subscriber.onNext(fancyText);
                    } catch (Exception e) {
                        subscriber.onError(e);
                    } finally {
                      subscriber.onCompleted();
                    }
                }

            });
        }
    });
}


А вот похожий вариант, не требующий выполнять действие строго в конкретном потоке:

Observable<String> observable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hi");
            subscriber.onCompleted();
        }
    }
);



Здесь важно отметить три метода:

  1. onNext(v) – отправляет подписчику новое значение
  2. onError(e) – уведомляет наблюдателя о произошедшей ошибке
  3. onCompleted() – уведомляет подписчика о том, что следует отписаться, поскольку от данного observable больше не поступит никакого контента


Кроме того, вероятно, будет удобно пользоваться RxJavaAsyncUtil.

Интеграция с другими библиотеками

По мере того, как RxJava становится все популярнее и де-факто превращается в стандарт асинхронного программирования в Android, все больше библиотек все в большей мере интегрируются с ней.

Всего несколько примеров:

Retrofit — «Типобезопасный HTTP-клиент для Android и Java»
SqlBrite — «Легкая обертка для SQLiteOpenHelper, обогащающая SQL-операции семантикой реактивных потоков.»
StorIO — «Красивый API для SQLiteDatabase и ContentResolver»

Все эти библиотеки значительно упрощают работу с HTTP-запросами и базами данных.

Интерактивность с Android UI

Это введение было бы неполным, если бы мы не рассмотрели, как использовать нативные UI-элементы в Android.

TextView finalText;
EditText editText;
Button button;
...

    RxView.clicks(button)
            .subscribe(new Action1<Void>() {
                @Override
                public void call(Void aVoid) {
                    System.out.println("Click");
                }
            });

    RxTextView.textChanges(editText)
            .subscribe(new Action1<CharSequence>() {
                @Override
                public void call(CharSequence charSequence) {
                    finalText.setText(charSequence);
                }
            });
...


Очевидно, можно просто положиться на setOnClickListener, но в долгосрочной перспективе RxBinding может подойти вам лучше, поскольку позволяет подключить UI к общему потоку RxJava.

Советы

Практика показывает, что при работе с RxJava следует придерживаться некоторых правил.

Всегда использовать обработчик ошибок

Пропускать обработчик ошибок таким образом

.subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        System.out.println("Click");
    }
});


обычно не следует. Исключение, выбрасываемое в наблюдателе или в одном из действий будет выброшено исключение, которое, скорее всего, убьет вам все приложение.

Еще лучше было бы сделать обобщенный обработчик:

.subscribe(..., myErrorHandler);


Извлекать методы действий

Если у вас будет много внутренних классов, то через некоторое время удобочитаемость кода может испортиться (особенно если вы не работаете с RetroLambda).

Поэтому такой код:

.doOnNext(new Action1<Pair<Settings, List<Message>>>() {
    @Override
    public void call(Pair<Settings, List<Message>> pair) {
        System.out.println("Received settings" + pair.first);
    }
})


выглядел бы лучше после такого рефакторинга:

.doOnNext(logSettings())

@NonNull
private Action1<Pair<Settings, List<Message>>> logSettings() {
    return new Action1<Pair<Settings, List<Message>>>() {
        @Override
        public void call(Pair<Settings, List<Message>> pair) {
            System.out.println("Received settings" + pair.first);
        }
    };
}


Использовать собственные классы или кортежи

Бывают случаи, в которых некое значение определяется другим значением (например, пользователь и пользовательские настройки), и вы хотели бы получить оба этих значения при помощи двух асинхронных запросов.
В таких случаях рекомендую использовать JavaTuples.

Пример:

Observable.fromCallable(createNewUser())
        .subscribeOn(Schedulers.io())
        .flatMap(new Func1<User, Observable<Pair<User, Settings>>>() {
            @Override
            public Observable<Pair<User, Settings>> call(final User user) {
                return Observable.from(fetchUserSettings(user))
                        .map(new Func1<Settings, Pair<User, Settings>>() {
                            @Override
                            public Pair<User, Settings> call(Settings o) {
                                return Pair.create(user, o);
                            }
                        });

            }
        });



Управление жизненным циклом

Зачастую бывает так, что фоновый процесс (подписка) должен просуществовать дольше, чем активность или фрагмент, в котором (которой) он содержится. Но что если результат вас уже не интересует, как только пользователь покинет активность?

В таких случаях вам поможет проект RxLifecycle.

Оберните ваш observable вот так (взято из документации) и сразу после его разрушения выполнится отписка:

public class MyActivity extends RxActivity {
    @Override
    public void onResume() {
        super.onResume();
        myObservable
            .compose(bindToLifecycle())
            .subscribe();
    }
}



Заключение

Конечно, это далеко не полное руководство об использовании RxJava в Android, но, надеюсь, смог вас убедить, что в некоторых отношениях RxJava лучше обычных AsyncTask.
Актуальность темы

Проголосовало 152 человека. Воздержалось 32 человека.

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

Поделиться с друзьями
-->

Комментарии (16)


  1. Fen1kz
    04.08.2016 12:58
    +1

    Имхо, в примерах стоит использовать лямбды с типами. Вместо


    return new Action1<Pair<Settings, List<Message>>>() {
            @Override
            public void call(Pair<Settings, List<Message>> pair) {
                System.out.println("Received settings" + pair.first);
            }
        };

    писать


    return ((Pair<Settings, List<Message>>) pair) -> {
                System.out.println("Received settings" + pair.first);
            }


    1. Andrew51130
      04.08.2016 16:57
      +1

      .doOnNext{ System.out.println(«Received settings» + it.first) }
      kotlin forever


      1. Fen1kz
        04.08.2016 17:25

        Введение же, важно указать, какие там типы. Так-то можно и
        .doOnNext(pair -> System.out.println("Received settings" + pair.first))


    1. vagran
      04.08.2016 17:49

      Да, без лямбд Java код в функциональном стиле вообще нечитаем. Для Андроида можно использовать retrolambda.


      1. arturdumchev
        07.08.2016 16:40

        Уже без retrolambda можно. Просто в gradle поставить jackOptions:

        android {
            compileSdkVersion 24
            buildToolsVersion "24.0.0"
        
            defaultConfig {
                ...
                jackOptions {
                    enabled true
                }
            }
            ...
        }
        


        1. kolipass
          08.08.2016 07:00

          И лишиться бесплатного Instant Run? Кстати, а Mokito уже как-нибудь завели с Jack?


  1. Tiberal
    04.08.2016 13:32
    +9

    Сколько можно этих кратких введений? Каждый месяц по паре статей на протяжении года. Думаю все кому надо уже «ввелись», и можно вещать о продвинутом использовании либы.


  1. Zeliret
    04.08.2016 15:07
    +1

    Уже год пишу под Андроид с помощью этой либы, а все одни введения. Когда уже напишут что-то, чего я не знаю? :))


    1. ph_piter
      04.08.2016 15:16

      Ну вот видите, Орейли вняли!

      Ждите, будем стараться заполучить и предоставить в классном переводе


  1. rude
    04.08.2016 16:11
    +4

    не холивара ради… мне одному кажется что все введения в RxJava стараются придумать такие задачи, чтобы обосновать необходимость использовать RxJava вместо стандартных механизмов?

    мне очень сложно представить реальную пользу от получения всей информации из примера в одном месте (в одной Activity) и одновременно

    возможно было бы более уместно рассказывать о стандартных задачах и насколько легче они решаются с RxJava


    1. oldcastor
      04.08.2016 19:40

      Есть приложение, работающее по 3g, часть данных меняется не чаще раза в сутки, эти данные кешируются при первом запуске за сутки в qlite базу. Другая часть данных нуждается в проверке на момент использования, соответственно при использовании и тех и других имеем простыню из двух asyncTask то что имеем. Добавим сюда транспортировку данных из асинктаск-а в основной поток (класс не вложен в класс активити) и получим

      пользу от получения всей информации из примера в одном месте (в одной Activity)


      На самом деле, конечно, случаются такие ситуации не на каждом шагу, их можно решить и стандартными средствами, но если можно сделать проще и короче, то почему нет?

      Пригляжусь к RxJava повнимательнее.


      1. rude
        04.08.2016 20:16

        то что Вы описали делается через один сервис (для синхронизации раз в день) и один AsyncTaskLoader<нужный тип>. Именно использование AsyncTask для этой задачи и усложняет Вам жизнь IMO.

        Ваш пример из практики, я полагаю. Однако не совсем понятно, как RxJava сделает жизнь проще в Вашем случае?

        P. S. я интересуюсь с целью понять в чем преимущество RxJava на практике. меньше кода?


        1. oldcastor
          04.08.2016 20:48

          Да, пример из текущего проекта, и работает всё ровно, но нет легкости в структуре. Хотя, может просто нужно больше опыта…

          И я понятия не имею, сделает RxJava жизнь проще или нет) Просто альтернативная моей реализация может оказаться компактнее/легче в расширении/понимании или ещё в чем-то… а может и не оказаться — сам то я ещё не пробовал, а посему и написал, что пригляжусь к реактивным библиотекам.


    1. Dimezis
      07.08.2016 16:29

      Да много преимуществ. Как минимум Api поприятнее, чем у того же Loader'a, хоть это и не замена ему.

      Меньше кода (не во вред читаемости), простая и удобная обработка ошибок.
      Удобная работа с сетью, базой, отсутствие тонны вложенных коллбеков.
      Возможность красиво распределять операции по различным потокам с разными приоритетами.

      Комбинирование операций над данными (как одновременно, так и последовательно). Это, в принципе, основная фишка. Бывают случаи, что просто не представляю как соорудить что-то адекватное без Rx, чтоб без кучи каких-то тредов, атомиков, мютексов и кэшей.


  1. MamOn
    04.08.2016 22:49
    +1

    > В таких случаях рекомендую использовать JavaTuples.

    Мне кажется, что это одно из великих зол, изобретённых в мире java. Поддерживать добро, читая по коду разбросанные a.getValue1(), b.getValue2() и т.д. по мне так доставляет неимоверной боли и неочевидных багов.


  1. nikitosgleb
    08.08.2016 08:39

    Такое активное желание сообщества «Объяснить зачем это всем обязательно нужно» — настораживает.

    3-th part библиотечек — полный github — RXJava — ОДНА ИЗ — кому надо — найдут/изучат/заюзают.
    Java — интерпрайзы на продакшнах — досих пор у людей без RX-ов… и котлинов.

    Расскажите про «несовершенсто java» — серверным разработчикам — оставьте уже Андроид в покое.

    Увы никакие RX-ы, лямбды, и ретро-лябды — не уберегут вас от утечек в инер классах, и тп-
    как ни старайтесь — «много писать (в тч с WeakReferences) все равно придется».

    Да и AOSP вроде как пока не Warton-у принадлежит и даже не Square.
    … мода — приходит и уходит…