1. Введение в реактивное программирование


Разрабатывая сложное приложение под Android со множеством сетевых соединений, взаимодействием с пользователем и анимацией — означает писать код, который полон вложенных обратных вызовов. И по мере развития проекта такой код становится не только громоздким и трудно понимаемым, но также сложным в развитии, поддержке и подвержен множеством трудноуловимым ошибкам.

ReactiveX или функциональное реактивное программирование предлагает альтернативный подход, который позволяет значительно сократить код приложения и создавать изящные понимаемые приложения для управления асинхронными задачами и событиями. В реактивном программировании потребитель реагирует на данные, как они придут и распространяет изменения события в зарегистрированных наблюдателях.

RxJava — реализация ReactiveX с открытым исходным кодом на Java. Базовыми строительными блоками реактивного кода являются Observables и Subscribers. Подробнее с базовой основой можно ознакомиться в статье Грокаем* RxJava, часть первая: основы.

RxAndroid — расширение к RxJava, которое позволяет планировщику запускать код в основном и дополнительных потоках Android приложения и обеспечивает передачу результатов из созданных дополнительных потоках в основное для агрегации и взаимодействия с интерфейсом пользователя.
С целью более полного понимания основных принципов реактивного программирования рассмотрим практический пример для платформы Android. И начнем с настройки окружения для разработки.

2. Подготовка окружения


Подключаем основные библиотеки и прописываем зависимости в секции dependencies{} конфигурационного файла buil.gradle:
dependencies { 
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6' 
}

Подключаем поддержку лямбда-выражений — используем новые возможности языка Java 8 на платформе Android N. Чтобы использовать возможности языка Java 8 также необходимо подключить и новый компилятор Jack, для чего добавьте в файл build.gradle:
android {
	...
  	defaultConfig {
    	...
    	jackOptions {
      		enabled true
    	}
  	}
  	compileOptions {
    		sourceCompatibility JavaVersion.VERSION_1_8
    		targetCompatibility JavaVersion.VERSION_1_8
  	}
}

Примечание: Jack поддерживается только в Android Studio 2.1 и также необходимо выполнить обновление до JDK 8.

При внесении изменений в конфигурационном файле gradle появляется предупреждение о необходимости синхронизировать проект и, чтобы применить все изменения нажмите на ссылку Sync Now вверху-справа.

3. Создаем базовый пример


В связи с тем, что применение RxAndroid в большинстве случаев связано с проектами с много-поточной обработкой сетевых соединений — рассмотрим простой пример обработки результатов парсинга сайта.
Для отображения результатов создадим простой layout:
<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"
   ...>
   <ScrollView
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       android:id="@+id/scrollView" >
       <TextView
           android:layout_width="wrap_content"
           android:layout_height="wrap_content"
           android:id="@+id/textView" />
   </ScrollView>
</RelativeLayout>

Для парсинга создадим простой класс WebParsing с двумя методами getURLs и getTitle:
public class WebParsing {
public List<String> getURLs(String url) {
   Document doc;
   List<String> stringList = new ArrayList<>();
   try {
       doc = Jsoup.connect(url).get();
       Elements select = doc.select("a");
       for (Element element : select) {
           stringList.add(element.attr("href"));
       }
   } catch (IOException e) {
       e.printStackTrace();
       return null;
   }
   return stringList;
}
}

public String getTitle(String url) {
   String title;
   try {
       Document doc = Jsoup.connect(url).get();
       title = doc.title();
   } catch (MalformedURLException mue) {
       mue.printStackTrace();
       return null;
   } catch (HttpStatusException hse) {
       hse.printStackTrace();
       return null;
   } catch (IOException e) {
       e.printStackTrace();
       return null;
   } catch (IllegalArgumentException iae) {
       iae.printStackTrace();
       return null;
   }
   return title;
}

Метод getURLs просматривает содержимое сайта и возвращает список всех найденных ссылок, а метод getTitle возвращает Title сайта по ссылке.

4. Подключаем реактивность


Для того, чтобы использовать возможности RxAndroid на основе приведенных выше методов создадим два соответствующих Observables:
Observable<List<String>> queryURLs(String url) {
   WebParsing webParsing = new WebParsing();
   return Observable.create(
           new Observable.OnSubscribe<List<String>>() {
               @Override
               public void call(Subscriber<? super List<String>> subscriber) {
                   subscriber.onNext(webParsing.getURLs(url));
                   subscriber.onCompleted();
               }
   }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}

Observable<String> queryTitle(String url) {
   WebParsing webParsing = new WebParsing();
   return Observable.create(new Observable.OnSubscribe<String>() {
       @Override
       public void call(Subscriber<? super String> subscriber) {
           subscriber.onNext(webParsing.getTitle(url));
           subscriber.onCompleted();
       }
   }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}

Первый Observable будет порождать список URL ссылок, найденных на сайте, второй будет порождать Title. Разберем пример перового метода подробно и построчно:
  1. Observable<List > queryURLs(String url) — строка объявляет Observable метод, который принимает в виде входного параметра ссылку на сайт для парсинга и возвращает результат парсинга в виде списка ссылок <List> с указанного сайта;
    WebParsing webParsing = new WebParsing() — создает переменную для доступа к нашим функциям парсинга;
    return Observable.create — создает Observable, возвращающего список ссылок;
    new Observable.OnSubscribe<List>() — строка объявляет интерфейс OnSubscribe с одним методом (см. ниже), который вызовется при подписке;
    public void call(Subscriber<? super List> subscriber) — перегружает метод call, который будет вызываться после подписки Subscriber;
    subscriber.onNext(webParsing.getURLs(url)) — вызывает метод onNext для передачи данных Subscriber всякий раз, когда порождаются данные. Этот метод принимает в качестве параметра объект, испускаемый Observable;
    subscriber.onCompleted() — Observable вызывает метод onCompleted() после того, как вызывает onNext в последний раз, если не было обнаружено никаких ошибок;
    subscribeOn(Schedulers.io()) — метод subscribeOn подписывает всех Observable выше по цепочке на планировщик Schedulers.io();
    observeOn(AndroidSchedulers.mainThread()) — метод observeOn позволяет получить результат в основном потоке приложения.

5. Запускаем первое реактивное приложение


Итак, Observables созданы, реализуем простейший пример на основе первого выше метода, который будет выводить список ссылок сайта:
public void example0(final TextView textView, String url) {
   queryURLs(url)
           .subscribe(new Action1<List<String>>() {
               @Override
               public void call(List<String> urls) {
                   for (String url: urls) {
                       String string = (String) textView.getText();
                       textView.setText(string + url + "\n\n");
                   }
               }
           });
}

Обернем наш реализуемый пример в класс MainExample и вызовем в MainActivity:
public class MainActivity extends AppCompatActivity {
   TextView textView;
   @Override
   protected void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.activity_main);
       textView = (TextView) findViewById(R.id.textView);
       MainExample mainExample = new MainExample();
       mainExample.example0(textView, "https://yandex.ru/");
   }
}

6. Наращиваем реактивность — использование операторов


Observable может трансформировать выходные данные с помощью операторов и они могут быть использованы в промежутке между Observable и Subscriber для манипуляции с данными. Операторов в RxJava очень много, поэтому для начала рассмотрим наиболее востребованные.
И начнем с того, что избавимся от цикла в подписчике и заставим наблюдателя последовательно испускать данные полученного массива ссылок, и поможет в этом нам оператор from():
public void example1(final TextView textView, String url) {
   queryURLs(url)
           .subscribe(new Action1<List<String>>() {
               @Override
               public void call(List<String> urls) {
                   Observable.from(urls)
                           .subscribe(new Action1<String>() {
                               @Override
                               public void call(String url) {
                                   String string = (String) textView.getText();
                                   textView.setText(string + url + "\n\n");
                               }
                           });
               }
           });
}

Выглядит не совсем красиво и немного запутанно, поэтому применим следующий оператор flatMap(), который принимает на вход данные, излучаемые одним Observable, и возвращает данные, излучаемые другим Observable, подменяя таким образом один Observable на другой:
public void example2(final TextView textView, String url) {
   queryURLs(url)
           .flatMap(new Func1<List<String>, Observable<String>>() {
               @Override
               public Observable<String> call(List<String> urls) {
                   return Observable.from(urls);
               }
           })
           .subscribe(new Action1<String>() {
                               @Override
                               public void call(String url) {
                                   String string = (String) textView.getText();
                                   textView.setText(string + url + "\n\n");
                               }
                           });
}

На следующем шаге еще разгрузим наш Subscriber и воспользуемся оператором map(), через который можно преобразовывать один элемент данных в другой. Оператор map() также может преобразовывать данные и порождать данные необходимого нам типа, отличного от исходного. В нашем случае наблюдатель будет формировать список строк, а подписчик только выведет их на экран:
public void example3(final TextView textView, String url) {
   queryURLs(url)
           .flatMap(new Func1<List<String>, Observable<String>>() {
               @Override
               public Observable<String> call(List<String> urls) {
                   return Observable.from(urls);
               }
           })
           .map(new Func1<String, String>() {
               @Override
               public String call(String url) {
                   return textView.getText() + url + "\n\n";
               }
           })
           .subscribe(new Action1<String>() {
               @Override
               public void call(String url) {
                   textView.setText(url);
               }
           });
}

Основные возможности мы рассмотрели и сейчас пришло время воспользоваться лямбдами, чтобы упростить наш код:
queryURLs(url)
       .flatMap(urls -> Observable.from(urls))
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(url1 -> {
           textView.setText(url1);
       });

или еще проще:
queryURLs(url)
       .flatMap(Observable::from)
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(textView::setText);

Сравним конструкцию выше с получившимся кодом и ощутим мощь и простоту лямбда-выражений.

7. Увеличиваем мощности


На следующем шаге усложним нашу обработку и воспользуемся оператором flatMap(), чтобы подключить второй подготовленный метод queryTitle(), также возвращающий наблюдателя. Этот метод возвращает Title сайта по ссылке на сайт. Создадим пример, в котором будем формировать и выводить список заголовков сайтов по ссылкам, найденным на веб-странице, т.е. вместо полученного списка ссылок на сайты в предыдущем примере выведем заголовки (Title) этих сайтов:
public void example4(final TextView textView, String url) {
   queryURLs(url)
           .flatMap(new Func1<List<String>, Observable<String>>() {
               @Override
               public Observable<String> call(List<String> urls) {
                   return Observable.from(urls);
               }
           })
           .flatMap(new Func1<String, Observable<String>>() {
               @Override
               public Observable<String> call(String url) {
                   return queryTitle(url);
               }
           })
           .subscribe(new Action1<String>() {
               @Override
               public void call(String title) {
                   textView.setText(title);
               }
           });
}

или в сокращенном виде:
queryURLs(url)
       .flatMap(Observable::from)
       .flatMap(this::queryTitle)
       .subscribe(textView::setText);

добавляем map() для формирования списка заголовков:
queryURLs(url)
       .flatMap(Observable::from)
       .flatMap(this::queryTitle)
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(textView::setText);

с помощью оператора filter() отфильтровываем пустые строки со значением null:
queryURLs(url)
       .flatMap(Observable::from)
       .flatMap(this::queryTitle)
       .filter(title -> title != null)
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(textView::setText);

с помощью оператора take() возьмем только первые 7 заголовков:
queryURLs(url)
       .flatMap(Observable::from)
       .flatMap(this::queryTitle)
       .filter(title -> title != null)
       .take(7)
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(textView::setText);

Последний пример показал, что объединение множества методов плюс использование большого количества доступных операторов плюс лямбда-выражения и мы получаем буквально из нескольких строк мощный обработчик потоков различных данных.

Все примеры, приведенные в статье выложены здесь.

Источники:


  1. Официальная документация
  2. Грокаем* RxJava, часть первая: основы
  3. Getting Started With ReactiveX on Android
  4. RxJava — Tutorial
  5. Getting Started with RxJava and Android
  6. Reactive Programming with RxJava in Android
  7. Party tricks with RxJava, RxAndroid & Retrolambda
Поделиться с друзьями
-->

Комментарии (23)


  1. Alex_ME
    29.07.2016 18:59

    Подключаем поддержку лямбда-выражений — используем новые возможности языка Java 8 на платформе Android N. Чтобы использовать возможности языка Java 8 также необходимо подключить и новый компилятор Jack, для чего добавьте в файл build.gradle

    Доступны ли лямбды на предыдущих версиях? Или придется использовать retrolambda?


    1. vladv75
      29.07.2016 19:29

      В принципе, retrolambda уже можно и не использовать. Ставим поддержку SDK API 24 и в проекте устанавливаем minSdkVersion на необходимую нам версию и все должно работать. У меня, например, все отлично работает на телефоне с API 17.


  1. Dimezis
    29.07.2016 22:31
    +1

    Я бы не рекомендовал новичкам учиться по этой статье, даже если забыть, что есть Retrofit, и рассматривать это как просто как базовый пример.
    Масса недочетов, плохой код-стайл и беспорядок в терминах и понятиях.


    1. vladv75
      29.07.2016 23:58
      +1

      А можно конкретизировать ваши замечания? Доработаю статью.


      1. Suvitruf
        30.07.2016 10:04

        Я не силён в Android, но:
        1) getURLs и getTitles у вас могут null вернуть. Тогда все методы в MainExample.java, которые вызывают queryURLs, при попытке прогнать полученный список String url: urls выбросят исключение.
        2) Не уверен, что Jsoup нормально хендлит повороты экрана и т.п. вещи связанные с жизненным циклом Android приложений.


        1. vladv75
          30.07.2016 23:49

          1) getURLs и getTitles у вас могут null вернуть. Тогда все методы в MainExample.java, которые вызывают queryURLs, при попытке прогнать полученный список String url: urls выбросят исключение.

          Может быть не совсем красиво получилось, но здесь специально сделал возврат null, чтобы потом показать использование .filter(title -> title != null)

          2) Не уверен, что Jsoup нормально хендлит повороты экрана и т.п. вещи связанные с жизненным циклом Android приложений.

          Jsoup никак не связан с поворотом экрана, в данном простом примере при повороте экрана происходит новая загрузка данных.


      1. Dimezis
        30.07.2016 12:41
        +1

        Как уже написали выше, у вас проблемы с методами getURLs и getTitles. Следовало бы поймать exception внутри Observable и вернуть его подписчику в onError, чтобы он решил, что с ним делать.

        — Observable queryURLs(String url) — строка объявляет метод, который порождает строку ссылки на сайт для парсинга и возвращающего список ссылок.
        Если уж решили зачем-то вдаваться в такие мелочи, выражайтесь корректно.

        — new Observable.OnSubscribe() — интерфейс OnSubscribe создает подписчика
        Интерфейс OnSubscribe ничего не создает, это просто интерфейс с 1 методом, который вызовется при подписке.

        — subscribeOn(Schedulers.io()) — метод subscribeOn запускает наш код в дополнительном потоке;
        Не совсем так. Этот метод подписывает всех Observable выше по цепочке на определенный планировщик. Повторный вызов метода ниже по цепочке (с другим планировщиком) не даст никакого результата, например.

        Названия методов Example0...1 и т.д. с заглавной буквы.

        Статьи в списке источников куда полезнее.
        Добавил бы еще это, Rx далеко не заканчивается на сетевых запросах:
        Пагинация 1
        Пагинация 2
        Shake detector
        Доклад Артема Зинатулина

        И просто десятки статей всяких блоггеров, не знаю почему вдруг «хороших материалов мало»


        1. Dimezis
          30.07.2016 12:49
          +1

          И да, так себе идея создавать Observable, который сразу подписан на какие-то планировщики —

          .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
          

          На каком планировщике обработать результат, пусть решает сам подписчик.
          Кроме того, у вас все операторы под queryURLs(...) выполняются на планировщике с Looper'ом, то есть каждый из них постит результат в очередь событий. Хотя можно было бы это сделать только для метода subscribe.


          1. vladv75
            30.07.2016 23:59

            Но здесь ведь работа с сетью, которая без вариантов требует отдельного потока, поэтому сразу и подписываю на отдельный поток.


            1. Dimezis
              31.07.2016 11:01

              Ну, допустим.
              Но observeOn(...) не стоит там вызывать, уже объяснил почему.


              1. vladv75
                01.08.2016 22:34

                Хорошо, подправлю.


        1. vladv75
          30.07.2016 23:58
          +1

          Как уже написали выше, у вас проблемы с методами getURLs и getTitles. Следовало бы поймать exception внутри Observable и вернуть его подписчику в onError, чтобы он решил, что с ним делать.

          Придумал может не совсем корректный пример с возвратом null, чтобы потом показать использование .filter(title -> title != null)
          А exception решил пока не обрабатывать по аналогии статьи “Грокаем* RxJava, часть первая: основы”, чтобы упростить пример.

          — Observable queryURLs(String url) — строка объявляет метод, который порождает строку ссылки на сайт для парсинга и возвращающего список ссылок.
          Если уж решили зачем-то вдаваться в такие мелочи, выражайтесь корректно.

          Переделал:
          — строка объявляет Observable метод, который принимает в виде входного параметра ссылку на сайт для парсинга и возвращает результат парсинга в виде списка ссылок <List> с указанного сайта.

          — new Observable.OnSubscribe() — интерфейс OnSubscribe создает подписчика
          Интерфейс OnSubscribe ничего не создает, это просто интерфейс с 1 методом, который вызовется при подписке.

          Переделал:
          — строка объявляет интерфейс OnSubscribe с одним методом (см. ниже), который вызовется при подписке.

          — subscribeOn(Schedulers.io()) — метод subscribeOn запускает наш код в дополнительном потоке;
          Не совсем так. Этот метод подписывает всех Observable выше по цепочке на определенный планировщик. Повторный вызов метода ниже по цепочке (с другим планировщиком) не даст никакого результата, например.

          Переделал:
          — метод subscribeOn подписывает всех Observable выше по цепочке на планировщик Schedulers.io().

          Названия методов Example0...1 и т.д. с заглавной буквы.

          Сам не понимаю, почему назвал с заглавной буквы и почему потом не бросилось в глаза. Все соответственно исправил.

          Статьи в списке источников куда полезнее.
          Добавил бы еще это, Rx далеко не заканчивается на сетевых запросах:
          Пагинация 1
          Пагинация 2
          Shake detector
          Доклад Артема Зинатулина

          Огромное спасибо за ссылки на статьи, большинство из них мне не попались при поиске.


  1. vladv75
    30.07.2016 09:21

    Было бы не плохо увидеть комментарии профессионалов с конкретными замечаниями и пожеланиями по доработке и развитию статьи. Хочется создать полезный материал для ИТ-сообщества. Тема не простая и хороших материалов мало.


    1. EBCEu4
      31.07.2016 18:43

      Нужно стараться делать все вызовы pure.
      Оператор map модифицирующий внешний контекст или захватывающий textView неприемлем.
      Если где — то кроме subscribe есть closure, значит, что-то пошло не так, нужно пересматривать решение.

      Зачем .flatMap(Observable::from)? Ради того, чтобы взять 7 первых ссылок?
      queryTitle должен принимать список url и возвращать список title и не нужен будет flatMap, не нужно будет проверять на null.

      Вы отсеиваете уже проделанную работу queryTitle, на которую было потрачено процессорное время,
      если вы хотите брать 7 первых ссылок то надо фильтровать их на входе, или передавать ограничение в качестве параметра queryTitle.

      subscribeOn и observeOn лучше оставить на усмотрение вызывающей стороне, так она сможет решить сама где она хочет производить парсинг, а где обновление UI.
      В крайнем случае subscribeOn можно вызвать внутри реализации api, если ваш контракт подразумевает только асинхронное выполнение.

      Что — то вроде того:

      queryURLs(url)
             .map(this::queryTitle)
             .subscribeOn(Schedulers.io())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(titles -> textView.setText(Joiner.on("\n\n").join(titles));
      


      У вас же, queryURLs делает работу на background потоке и потом пушит результат в ui, а следом queryTitle опять начинает выполнять загрузку и парсинг в своем background потоке.
      Это приводит к ненужным пушам в looper ui потока, которому и так есть чем заняться.
      Поэтому лучше оставлять такие решения вызывающей стороне.

      Надеюсь не очень скомкано получилось.


      1. vladv75
        01.08.2016 22:40

        Спасибо за хороший и развернутый комментарий!
        Получилось так, потому что увлекся демонстрацией возможностей операторов по аналогии с прочитанными статьями и получилось в ущерб самому примеру.


  1. Artem_zin
    30.07.2016 23:31

    Пожалуйста, хватит рекомендовать использовать Observable.create(). Вы не делаете поддержку backpressure, не проверяете, что подписчик уже отписался и не обрабатываете ошибки (хотя тут RxJava спасет вас сама, но тем не менее).


    Используйте Observable.fromAsync() если вам нужно конвертнуть callback api в реактивный (пока не рекомендую для библиотек, но рекомендую для приложений, api еще не стабильный).


    1. EBCEu4
      31.07.2016 18:03

      Single, не?


      1. Artem_zin
        31.07.2016 18:37

        Callback api, как правило, кидает больше одного события — поэтому Observable, можно и Single, если он подходит.


    1. vladv75
      03.08.2016 11:03

      Если не затруднит, можно ли пример с Observable.fromAsync()?


      1. Artem_zin
        06.08.2016 14:33

        Он есть прямо в javadoc у оператора


  1. EBCEu4
    31.07.2016 17:51

    Как же нелепо выглядит rx на джаве 7.
    На джаве 8 или с retrolambda, rx начинает выглядеть нелепо, когда нужно добавить свой кастомный оператор.
    Когда android разработчики начнут использовать Kotlin?


    1. Zeliret
      01.08.2016 14:19

      А зачем? Ретролямбы/лямбдыJava8 вполне себе справляются с чистотой кода. А переходить на новый язык ради «модного веяния» — это для вейперов.


      1. EBCEu4
        02.08.2016 16:51

        Ну, на мой взгляд, он просто лучше. Если вы переживаете насколько он production ready в его защиту могу сказать — половина андройд студии написана на нем, a в 3 версии gradle будет его использовать в качестве скриптового языка вместо groovy.
        Что касается rx, Java 8 и retro справляются ровно до тех пор пока вы используете стандартные операторы rx — сказывается отсутствие extension методов. Если вы хотите покинуть rx парадигму или наоборот в нее перейти, также возникают проблемы.

        Пока все хорошо:

        observable.map(x->x.getProp()).distinct();
        

        Но, если нам нужны свои кастомные операторы:
        ObservableOperators.anotherOperator(ObservableOperators.custom(observable.map(x->x.getProp()).distinct()), 10)
        

        Вместо:
        observable.map(x->x.getProp()).distinct().custom().anotherOperator(10);
        

        На самом деле это далеко не все. Чего стоят inline лямбды не напрягающие GC. И много чего еще…
        Основная мысль, которую я пытаюсь донести, android — frontend разработка, текущая тенденция — тонкий, легкий клиент. Мы не пишем супернагруженные суперпараллельные вундеркластеры, так почему бы не писать наш тонкий (как правило) клиент лаконично и красиво.