enter image description here


Меня зовут Аркадий, я Android-разработчик в Badoo. В последнее время в нашем блоге много постов про Go, PHP, JS, QA, и я решил разбавить их темами по мобильной разработке. Как раз занимался портированием одного Android-проекта с RxJava 1 на RxJava 2 и читал всё, что можно найти на эту тему в интернете. В частности, доклад Джейка Вортона с конференции GOTO Copenhagen 2016. Мне показалось, что это достойный кандидат на перевод – думаю, многие Android-разработчики задумываются о переходе на RxJava 2, и им интересно, что изменилось по сравнению с первой версией.


Джейк сделал достаточно объёмное введение о реактивном программировании, так что знание RxJava 1 не требуется для понимания статьи. Доклад был подготовлен, когда RxJava2 ещё только готовилась к выпуску (на текущий момент уже выпущена версия 2.1.0).



Почему Reactive?


Почему все вокруг вдруг стали говорить о реактивном программировании? Если вы не можете сделать приложение полностью синхронным, то наличие единственного асинхронного ресурса полностью ломает традиционный императивный стиль программирования, к которому мы привыкли. «Ломает» не в смысле «всё перестаёт работать», а в смысле «приводит к увеличению сложности», и в результате вы начинаете терять все преимущества императивного программирования.


Чтобы пояснить, почему я считаю это серьёзной проблемой, приведу пример.


Начнём с простого класса, который может получить для нас объект User с какими-то модификаторами.


interface UserManager {
    User getUser();
    void setName(String name);
    void setAge(int age);
}

UserManager um = new UserManager();
System.out.println(um.getUser());

um.setName("Jane Doe");
System.out.println(um.getUser());

Если бы мы жили в синхронном, однопоточном мире, то этот код делал бы именно то, что и ожидается: создание экземпляра, вывод пользователя, изменение каких-то свойств, вывод пользователя.


Проблема возникает тогда, когда мы начинаем прибегать к асинхронности. Допустим, нам нужно отразить изменения свойств на стороне сервера. Для этого последние два метода должны быть асинхронными. Как бы мы тогда изменили код?


Одно из решений — вообще ничего не делать: вы можете предположить, что асинхронный вызов обновления сервера будет успешным, так что можно внести изменения локально. Они будут отражены мгновенно. Как вы понимаете, это не лучшая идея. Сети непредсказуемы, сервер может вернуть ошибку, и тогда придётся как-то откатывать локальное состояние.


Простое решение — использовать Runnable, который будет выполнен, когда асинхронный вызов успешно завершится. Это реактивное поведение: мы обновляем отображаемые данные только тогда, когда уверены, что запрос на внесение изменений был успешен.


interface UserManager {
    User getUser();
    void setName(String name, Runnable callback);
    void setAge(int age, Runnable callback);
}
UserManager um = new UserManager();
System.out.println(um.getUser());

um.setName("Jane Doe", new Runnable() {
    @Override public void run() {
        System.out.println(um.getUser());
    }
});

Однако мы не обрабатываем проблемы, которые могут возникнуть (например, проблемы сети). Так может быть, стоит создать специальный Listener, чтобы мы могли что-нибудь сделать в случае возникновения ошибки?


UserManager um = new UserManager();
System.out.println(um.getUser());

um.setName("Jane Doe", new UserManager.Listener() {
    @Override public void success() {
        System.out.println(um.getUser());
    }

    @Override public void failure(IOException e) {
        // TODO show the error...
    }
});

Можем сообщить пользователю о проблеме. Можем автоматически попытаться снова. Подобные решения работают, и в этом направлении идут, когда нужно совместить асинхронный код с кодом, выполняющимся в одиночном потоке (в случае Android это UI-поток).


Чем больше вам нужно сделать асинхронных вызовов, тем больше возникает проблем. Например, когда пользователь заполняет форму, он изменяет несколько свойств. Или у вас есть последовательность асинхронных вызовов, когда успешное выполнение одного вызова должно запустить другой асинхронный вызов, которому тоже может сопутствовать успех или неудача.


UserManager um = new UserManager();
System.out.println(um.getUser());

um. setName(“Jane Doe”, new UserManager.Listener() {
    @Override public void success() {
        System.out.println(um.getUser());
    }  
    @Override public void failure(IOException e) {
        // TODO show the error…
    }
});

um.setAge(40, new UserManager.Listener() {
    @Override public void success() {
        System.out.println(um.getUser());
    }
    @Override public void failure(IOException e) {
        // TODO show the error…
    }
});

Не забывайте также, что всё это происходит в контексте Android. Поэтому необходимо учитывать много других факторов. К примеру, мы можем в колбэке success пытаться передавать информацию напрямую в UI, но проблема в том, что Activity в Android эфемерны. Они могут быть уничтожены в любой момент. Скажем, пользователю поступает входящий звонок – и ваше приложение сворачивается системой. А может, пользователь нажал на Home или Back. Если асинхронный вызов вернётся после уничтожения UI, то у вас возникнут сложности.


public final class UserActivity extends Activity {
    private final UserManager um = new UserManager();

    @Override protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        setContentView(R.layout.user);
        TextView tv = (TextView) findViewById(R.id.user_name);
        tv.setText(um.getUser().toString());

        um.setName("Jane Doe", new UserManager.Listener() {
            @Override public void success() {
                tv.setText(um.getUser().toString());
            }
            @Override public void failure(IOException e) {
                // TODO show the error...
            }
        });
    }
}

Есть императивные подходы к решению проблемы. Мы можем проверить состояние перед обращением к методам UI.


public final class UserActivity extends Activity {
    private final UserManager um = new UserManager();

    @Override protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        setContentView(R.layout.user);
        TextView tv = (TextView) findViewById(R.id.user_name);
        tv.setText(um.getUser().toString());

        um.setName(“Jane Doe”, new UserManager.Listener() {
            @Override public void success() {
                if (!isDestroyed()) {
                    tv.setText(um.getUser().toString());
                }
            }
            @Override public void failure(IOException e) {
                // TODO show the error…
            }
        });
    }
}

В этом примере мы создаём анонимный тип, который однозначно приведёт к краткосрочной утечке памяти, потому что он будет хранить ссылку на нашу Activity до тех пор, пока асинхронный вызов не завершится.


Проблема заключается ещё и в том, что мы не знаем, в каких потоках вызываются эти колбэки. Возможно, они вызываются в фоновом потоке, так что требуется передавать события в основной поток выполнения (main/UI thread).


public final class UserActivity extends Activity {
    private final UserManager um = new UserManager();

    @Override protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        setContentView(R.layout.user);
        TextView tv = (TextView) findViewById(R.id.user_name);
        tv.setText(um.getUser().toString());

        um.setName("Jane Doe", new UserManager.Listener() {
            @Override public void success() {
                runOnUiThread(new Runnable() {
                    @Override public void run() {
                        if (!isDestroyed()) {
                            tv.setText(um.getUser().toString());
                        }
                    }
                });
            }
            @Override public void failure(IOException e) {
                // TODO show the error...
            }
        });
    }
}

Мы захламили Activity кучей вещей, которые не имеют отношения к основной задаче, решаемой нашим кодом. И всё это лишь для того, чтобы начать работать асинхронно и обрабатывать асинхронные результаты. Мы лишь реализовали вызов асинхронного запроса; мы не блокируем ввод данных пользователем, не обрабатываем нажатия кнопок и не работаем со множественными полями.


Даже когда ваш код решает только одну простую задачу, и вы начинаете превращать его в реальное приложение, то сразу возникают проблемы, и вы остаётесь один на один с необходимостью управлять кучей состояний и проверок в ваших Activity.


Реактивное мышление


В реальном приложении всё работает асинхронно. У нас есть сеть, в которую мы отправляем запросы и из которой через длительное время получаем ответы. Мы не можем блокировать основной поток выполнения, так что работа с сетью должна выполняться в фоновом потоке. Мы не можем блокировать основной поток при работе с файловой системой, базой данных, записи в хранилище или даже в shared preferences, так что приходится выполнять эти операции в фоновых потоках.


Пользователи — тоже нечто вроде асинхронных источников данных. Мы даём им информацию через UI, и они реагируют на неё нажатиями кнопок и внесением данных в поля.


enter image description here


Пользователь может возвращаться к приложению в разное время. И приложение должно быть готово к приёму данных, должно быть реактивным, чтобы не возникло состояния, при котором блокируется основной поток выполнения; чтобы не было ситуации, когда часть данных поступает асинхронно, а приложение этого не ожидает и в результате не учитывает полученные данные или вообще падает. В этом и заключается сложность: вы должны поддерживать все эти состояния в ваших Activity / Fragment. Нужно примириться с тем, что многочисленные асинхронные источники генерируют и потребляют данные, возможно, с разной скоростью. И это мы ещё не учитываем работу самой Android, которая является асинхронной платформой. У нас есть push-уведомления, широковещательные сообщения и изменения конфигурации. Пользователи могут в любой момент поворачивать устройства из портретной ориентации в альбомную и обратно, и если ваш код к этому не готов, то приложение будет крашиться или вести себя неправильно.


Пока вы не можете обеспечить синхронность всей архитектуры вашего приложения, наличие единственного асинхронного ресурса будет ломать традиционный императивный стиль программирования.


Трудно найти приложение, не использующее сетевые запросы, а они по своей природе асинхронны. У вас есть диск, база данных — асинхронные источники. UI тоже должен рассматриваться исключительно как асинхронный источник. Так что по умолчанию всё в Android функционирует асинхронно. Если будете цепляться за традиционное императивное программирование и методики управления состояниями, то будете вредить сами себе.


enter image description here


Вместо того чтобы пытаться координировать все асинхронные элементы архитектуры, мы можем снять с себя эту обязанность, соединив их напрямую. Можно напрямую подписать UI на базу данных, чтобы он реагировал на изменения данных. Можно так изменить базу данных и сетевые вызовы, чтобы они реагировали на нажатие кнопки, а не пытаться получить клик и затем его отправить.


Было бы прекрасно, если бы получаемый нами сетевой ответ обновлял данные. Ведь когда обновляются данные, автоматически обновляется и UI. Таким образом мы снимаем с себя ответственность за это. Если Android делает что-то асинхронно (например, поворот экрана или рассылка broadcast), то было бы замечательно, если бы это автоматически отразилось на интерфейсе или при этом автоматически запускалась какая-нибудь фоновая задача.


enter image description here


В целом такой подход позволяет нам не писать кучу кода, необходимого для поддержки состояний: вместо того чтобы управлять состояниями, мы просто соединяем компоненты друг с другом.


RxJava


Переходим к RxJava. Эта реактивная библиотека стала наиболее популярной при разработке под Android по большей части потому, что была первым полноценным [реактивным] инструментом для Java. RxJava 2 сохраняет поддержку старой версии Java, что важно для разработки под Android.


RxJava предоставляет:


  • набор классов для представления источников данных;
  • набор классов для прослушивания источников данных;
  • набор методов для преобразования и комбинирования данных (операторы).

Источники


Источник данных делает некую работу, когда вы начинаете или заканчиваете его прослушивать. Представьте сетевой запрос, который не будет отправлен, пока вы не начнёте ожидать ответа. И если вы отписываетесь от источника данных до его завершения, то теоретически он может отменить сетевой запрос.


Источник может работать как синхронно, так и асинхронно. Например, блокирующий сетевой запрос, выполняющийся в фоновом потоке, либо что-то чисто асинхронное вроде обращения к Android и ожидания onActivityResult. Источник может выдать один элемент или несколько элементов. Сетевой запрос вернёт один ответ. Но пока работает ваш UI, поток нажатий на кнопки потенциально бесконечен, даже если вы подписаны на единственную кнопку.


Ещё источники могут быть пустыми. Это концепция источника данных, который не содержит никаких элементов и работа которого либо успешно выполняется, либо завершается сбоем. Чтобы было понятно: представьте, что вы записываете данные в базу данных или в файл. Они не возвращают вам элементы. Запись либо успешна, либо нет. В RxJava источники моделируют этот подход «выполнения или отказа» с помощью так называемых терминальных событий onComplete()/ onError(). Это аналогично методу, который либо возвращает ответ, либо бросает исключение.


Завершения может и не быть. К примеру, мы моделируем нажатие кнопки в качестве источника данных, который работает до тех пор, пока работает UI. И когда UI исчезает, то вы, вероятно, отписываетесь от этого источника нажатий кнопки, но он не завершает свою работу.


Всё это соответствует паттерну Observer. У нас есть нечто, что может генерировать данные; есть соглашение, как должны выглядеть эти данные. И мы хотим наблюдать за ними. Мы хотим добавить слушателя и получать уведомления, когда что-то происходит.


Flowable vs. Observable


В RxJava 2 источники представлены двумя основными типами — Flowable и Observable. Они устроены очень похоже. Оба генерируют от нуля до n элементов. Оба могут завершаться успешно или с ошибкой. Так зачем нам два разных типа для представления одной и той же структуры данных?


Всё сводится к такой штуке, как backpressure. Не углубляясь в подробности, скажу лишь, что backpressure позволяет замедлить работу источника данных. Существующие системы имеют ограниченные ресурсы. И с помощью backpressure мы можем сказать всем, кто шлёт нам данные, чтобы они притормозили, потому что мы не можем обрабатывать информацию с той скоростью, с которой она к нам поступает.


В RxJava 1 была поддержка backpressure, но она была добавлена довольно поздно в процессе развития API. В RxJava 1 каждый тип в системе имеет механизм backpressure. И хотя концепцию backpressure поддерживают все типы, далеко не все источники её реализуют, так что использование этого механизма может привести к падению приложения. Применение backpressure должно проектироваться и учитываться заранее. Именно поэтому в RxJava 2 два разных типа источников. Поэтому теперь вы можете указывать с помощью типа источника, должна ли осуществляться поддержка backpressure.


Допустим, у нас есть источник данных — события касания экрана. Мы не можем его замедлить. Нельзя же сказать пользователю: «Нарисуй-ка половину символа, остановись и подожди, пока я обработаю, а затем дорисуй оставшееся». Мы можем замедлить ввод данных иначе, например, отключив кнопки или отобразив другой UI, но сам источник замедлить не получится.


Возьмём другой пример: у нас есть база данных, содержащая большой набор строк, из которого нам нужно извлекать по несколько за раз. БД может очень эффективно решить эту задачу благодаря такому инструменту, как курсоры. Но для потока событий касания это реализовать невозможно, потому что нельзя замедлить палец пользователя.


В RxJava 1 оба вышеупомянутых типа реализованы как Observable, так что может случиться, что вы попробуете применить backpressure в рантайме, а в результате получите исключение MissingBackpressureException. Это стало причиной, по которой в RxJava 2 источники представлены в виде разных типов: один поддерживает backpressure, второй — нет. Оба типа Observable и Flowable ведут себя похоже с точки зрения передачи данных в колбэки. Для этого есть два соответствующих им интерфейса:


Observer:


interface Observer<T> {
    void onNext(T t);
    void onComplete();
    void onError(Throwable t);
    void onSubscribe(Disposable d);
}

И Subscriber:


interface Subscriber<T> {
    void onNext(T t);
    void onComplete();
    void onError(Throwable t);
    void onSubscribe(Subscription s);
}

Первый метод называется onNext, сюда будут доставляться элементы. Этот метод вызывается каждый раз, когда Observable или Flowable генерирует элемент, позволяя его обрабатывать произвольным образом. Это может происходить бесконечно. Если вы прослушиваете нажатие кнопки, то метод onNext будет вызываться при каждом нажатии. Для небесконечных источников есть два терминальных события:


  • onComplete — означает успешное завершение;
  • onError — означает, что либо исполнение onNext привело к исключению, либо возникла проблема на стороне источника.

onComplete и onError — это терминальные события, то есть вы уже не получите никаких событий от источника после получения одного из них.
Различие между интерфейсами Observer и Subscriber заключается в последнем методе – onSubscribe. Это новый метод по сравнению с RxJava 1. Когда вы подписываетесь на Observable или Flowable, то тем самым создаёте ресурс, а ресурсы часто приходится очищать после окончания работы с ними. Колбэк onSubscribe вызывается сразу же, как только вы начинаете прослушивать Observable или Flowable, и он передаст вам объект одного из двух типов: Disposable.


interface Observer<T> {
    void onNext(T t);
    void onComplete();
    void onError(Throwable t);
    void onSubscribe(Disposable d);
}

interface Disposable {
    void dispose();
}

Или Subscription:


interface Subscriber<T> {
    void onNext(T t);
    void onComplete();
    void onError(Throwable t);
    void onSubscribe(Subscription s);
}

interface Subscription {
    void cancel();
    void request(long r);
}

Применительно к Observable тип Disposable позволяет вызывать метод dispose, означающий «Я закончил работать с этим ресурсом, мне больше не нужны данные». Если у вас есть сетевой запрос, то он может быть отменён. Если вы прослушивали бесконечный поток нажатий кнопок, то это будет означать, что вы больше не хотите получать эти события, в таком случае можно удалить OnClickListener у View.


Всё это справедливо и для интерфейса Subscription. Хоть он и называется иначе, но используется точно так же: у него есть метод cancel(), аналогичный dispose(). Отличается он лишь наличием второго метода request(long r), посредством которого backpressure проявляется в API. C помощью этого метода мы говорим Flowable, что нам нужно больше элементов.


С поддержкой backpressure Без поддержки backpressure
0–n элементов, complete | error Flowable Observable

Итак, единственная разница между двумя этими типами заключается в том, что один поддерживает backpressure, а другой — нет.


Реактивные потоки


Хочу коснуться вопроса, почему типы Disposable и Subscription имеют разные названия, как и их методы — dispose() и cancel(). Почему нельзя было просто расширить один другим, добавив метод request()? Всё дело в спецификации реактивных потоков. Она является результатом инициативы ряда компаний, которые собрались вместе и решили выработать стандартный набор интерфейсов для реактивных библиотек Java. В спецификацию вошло четыре интерфейса.


interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

interface Subscriber<T> {
    void onNext(T t);
    void onComplete();
    void onError(Throwable t);
    void onSubscribe(Subscription s);
}

interface Subscription {
    void request(long n);
    void cancel();
}

interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

В представленном коде вы видите типы Subscriber и Subscription. Они являются частью спецификации, и поэтому именно эти названия были использованы в RxJava 2. Поскольку они являются частью стандарта, мы ничего не можем с этим поделать. Но у этой ситуации есть и хорошая сторона. Допустим, вам нужно использовать для потоков две разные библиотеки. Если их авторы реализовали вышеупомянутый стандарт, то вы сможете безболезненно переключаться между ними.


Реактивные потоки (С поддержкой backpressure) Без поддержки backpressure
0…n элементов, complete | error Flowable Observable

Тип Flowable реализует спецификацию реактивных потоков, что подразумевает поддержку backpressure.


Вернёмся к нашему UserManager. Раньше мы извлекали из этого класса пользователей, отображая их тогда, когда считали это уместным. Теперь мы можем воспользоваться Observable:


interface UserManager {
    Observable<User> getUser();
    void setName(String name);
    void setAge(int age);
}

Observable<User> – это источник объектов User. Он генерирует элемент при каждом изменении, и мы можем реагировать на это, отображая данные на экране. Теперь не надо пытаться определить наиболее подходящее для этого время на основе других событий, происходящих в системе.


Специализированные источники


В RxJava 2 есть три специализированных источника, представляющих собой подмножество Observable. Первый называется Single. Он либо содержит один элемент, либо выдаёт ошибку, так что это не столько последовательность элементов, сколько потенциально асинхронный источник одиночного элемента. Он не поддерживает backpressure. Можете представлять его себе как обычный метод. Вы вызываете метод и получаете возвращаемое значение; либо метод бросает исключение. Именно эту схему реализует Single. Вы на него подписываетесь и получаете либо элемент, либо ошибку. Но при этом Single является реактивным.


Второй тип называется Completable. Он похож на void-метод. Он либо успешно завершает свою работу без каких-либо данных, либо бросает исключение. То есть это некий кусок кода, который можно запустить, и он либо успешно выполнится, либо завершится сбоем.


Третий тип — Maybe. Это новый тип по сравнению с RxJava 1. Он может либо содержать элемент, либо выдать ошибку, либо не содержать данных — этакий реактивный Optional. Этот тип тоже не поддерживает backpressure.


В RxJava 2 нет типов, подобных Single/ Completable/ Maybe, но с поддержкой backpressure (совместимых с Reactive Streams Specification).


Реактивные потоки (С поддержкой backpressure) Без поддержки backpressure
0…n элементов, complete | error Flowable Observable
item | complete | error Maybe
item | error Single
complete | error Completable

interface UserManager {
    Observable<User> getUser();
    void setName(String name);
    void setAge(int age);
}

Если вызовы setName и setAge асинхронны, то они будут завершены либо успешно, либо с ошибкой, но не вернут данные. Для такого поведения подходит тип Completable.


interface UserManager {
    Observable<User> getUser();
    Completable setName(String name);
    Completable setAge(int age);
}

Создание источников


На этом примере показано, как создаются источники и как можно оборачивать то, что вы уже используете, в реактивные источники. Все эти типы имеют статические методы, позволяющие создавать их на основе одиночных значений.


Flowable.just("Hello");
Flowable.just("Hello", "World");

Observable.just("Hello");
Observable.just("Hello", "World");

Maybe.just("Hello");
Single.just("Hello");

Также можно создавать их из массивов или Iterable.


String[] array = { “Hello”, “World” };
List<String> list = Arrays.asList(array);

Flowable.fromArray(array);
Flowable.fromIterable(list);

Observable.fromArray(array);
Observable.fromIterable(list);

Но наиболее полезны два метода, которые, как мне кажется, больше всего будут использоваться для адаптации методов и действий, которые вы уже выполняете (как синхронно, так и асинхронно).


Первый называется fromCallable.


Observable.fromCallable(new Callable<String>() {
    @Override public String call() {
        return getName();
    }
});

Этот метод позволяет сделать реактивным некое синхронное поведение, при котором возвращается одиночное значение. fromCallable использует стандартный Java-интерфейс Callable, а это означает, что мы можем бросать исключения. Например, HTTP-запрос может бросать исключение ввода-вывода.


OkHttpClient client = // …
Request request = // …

Observable.fromCallable(new Callable<String>() {
    @Override public String call() throws Exception{
        return client.newCall(request).execute();
    }
});

Возвращаемый Observable (когда на него подпишутся) выполнит запрос, и если тот бросит исключение, то мы получим ошибку в onError. Если же запрос будет успешно завершён, то мы получим ответ в onNext.


fromCallable доступен для всех пяти типов:


Flowable.fromCallable(() -> "Hello");

Observable.fromCallable(() -> "Hello");

Maybe.fromCallable(() -> "Hello");

Single.fromCallable(() -> "Hello");

Completable.fromCallable(() -> "Ignored!");

Так можно моделировать синхронные источники одиночных данных. При императивном подходе мы использовали бы метод, возвращающий значение.


Для Maybe и Completable есть ещё по два дополнительных метода. Они позволяют моделировать источники, не возвращающие значения, – просто исполняемые куски кода, но зато реактивные.


Maybe.fromAction(()-> System.out.println(“Hello”));
Maybe.fromRunnable(()-> System.out.println(“Hello”));

Completable.fromAction(()-> System.out.println(“Hello”));
Completable.fromRunnable(()-> System.out.println(“Hello”));

Второй, самый мощный, метод для создания Observable метко назван create.


Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello");
        e.onComplete();
    }
});

Если вы знакомы с RxJava 1, то вы помните его – этот тот самый ужасный метод, который никогда не следует использовать в RxJava 1. Однако это было исправлено в RxJava 2, и теперь create – один из самых полезных методов, и его следует использовать для оборачивания источников данных. Мы создаём его, определяя метод subscribe, вызываемый каждый раз при появлении нового подписчика. В этот колбэк передаётся ObservableEmitter, который занимается прослушиванием. Можно взять данные и отправить их в ObservableEmitter. В вышеприведённом примере я синхронно отправляю данные, а затем успешно завершаю работу потока.


Преобразуем в лямбду.


Observable.create(e -> {
    e.onNext("Hello");
    e.onComplete();
});

Можно также отправить больше одной порции данных.


Observable.create(e -> {
    e.onNext("Hello");
    e.onNext("World");
    e.onComplete();
});

Вызывать onNext можно многократно.


Ещё одно преимущество — теперь мы можем моделировать асинхронные данные. К примеру, если нужно асинхронно выполнить HTTP-запросы, то можно вызывать onNext из колбэка HTTP-запроса.


OkHttpClient client = // …
Request request = // …

Observable.create(e -> {
    Call call = client.newCall(request);

    call.enqueue(new Callback() {
        @Override public void onResponse(Response r) throws IOException {
            e.onNext(r.body().string());
            e.onComplete();
        }
        @Override public void onFailure(IOException e) {
            e.onError(e);
        }
    });
});

Также, создавая Observable с помощью метода сreate, можно совершать действия, когда от источника отписываются. Если кто-то перестал прослушивать HTTP-запрос, то нам больше не нужно его выполнять. Можно добавить действие отмены HTTP-запроса и очистить ресурсы.


Observable.create(e -> {
    Call call = client.newCall(request);
    e.setCancelation(() -> call.cancel());
    call.enqueue(new Callback() {
        @Override public void onResponse(Response r) throws IOException {
            e.onNext(r.body().string());
            e.onComplete();
        }A
        @Override public void onFailure(IOException e) {
            e.onError(e);
        }
    });
});

Для Android это очень полезная возможность. Например, подписываясь на Observable, мы хотим начать прослушивать нажатия кнопок, а при отписке хотим удалить этот Listener, чтобы не возникло утечки.


View view = // …

Observable.create(e -> {
    e.setCancellation(() -> view.setOnClickListener(null));
    view.setOnClickListener(v -> e.onNext(v));
});

Создание с помощью метода create работает для всех пяти типов:


Flowable.create(e -> { … });

Observable.create(e -> { … });

Maybe.create(e -> { … });

Single.create(e -> { … });

Completable.create(e -> { … });

Наблюдение за источниками


Поговорим о методах onSubscribe в интерфейсах Observer / Subscriber.


Observer:


interface Observer<T> {
    void onNext(T t);
    void onComplete();
    void onError(Throwable t);
    void onSubscribe(Disposable d);
}

interface Disposable {
    void dispose();
}

Subscriber:


interface Subscriber<T> {
    void onNext(T t);
    void onComplete();
    void onError(Throwable t);
    void onSubscribe(Subscription s);
}

interface Subscription {
    void cancel();
    void request(long r);
}

На самом деле, вам необязательно реализовывать интерфейсы Observer/ Subscriber напрямую, когда вы подписываетесь с помощью метода subscribe. Это не вполне удобно как раз из-за четвёртого метода onSubscribe – в нём надо как-то обрабатывать передаваемый объект Disposable / Subscription в момент подписки.


Observable<String> o = Observable.just(“Hello”);

o.subscribe(new Observer<String>() {
    @Override public void onNext(Sring s) { … }
    @Override public void onComplete() { … }
    @Override public void onError(Throwable t) { … }

    @Override public void onSubscribe(Disposable d) {
        ???
    }
});

Вместо этого есть возможность использовать DisposableObserver, который автоматически обработает этот четвёртый метод, так что вам достаточно будет позаботиться лишь об уведомлениях от самого Observable.


Observable<String> o = Observable.just("Hello");

o.subscribe(new DisposableObserver<String>() {
    @Override public void onNext(String s) { … }
    @Override public void onComplete() { … }
    @Override public void onError(Throwable t) { … }
});

Но как нам отписываться? У нас больше нет этого четвёртого метода.
Первый способ – сохранить ссылку на DisposableObserver в Observer. Он реализует Disposable, поэтому можно вызвать метод dispose, который позаботится о переадресации вверх по цепочке.


Observable<String> o = Observable.just(“Hello”);

DisposableObserver observer = new DisposableObserver<String>() {
    @Override public void onNext(Sring s) { … }
    @Override public void onComplete() { … }
    @Override public void onError(Throwable t) { … }
}
o.subscribe(observer);

observer.dispose();

В RxJava 2 также появился новый метод subscribeWith, который похож на subscribe из RxJava 1. Он возвращает Disposable.


Observable<String> o = Observable.just(“Hello”);

Disposable d = new o.subscribeWith(new DisposableObserver<String>() {
    @Override public void onNext(String s) { … }
    @Override public void onComplete() { … }
    @Override public void onError(Throwable t) { … }
});

d.dispose();

По аналогии с составной подпиской из RxJava есть и составной Disposable: вы можете подписаться на несколько источников, взять возвращаемые Disposable, добавить их в CompositeDisposable и одновременно отписаться от всех потоков.


Observable<String> o = Observable.just(“Hello”);

CompositeDisposable disposables = new CompositeDisposable();

disposables.add(o.subscribeWith(new DisposableObserver<String>() {
    @Override public void onNext(Sring s) { … }
    @Override public void onComplete() { … }
    @Override public void onError(Throwable t) { … }
}));

disposables.dispose();

На Android вы часто будете сталкиваться с тем, что у вас есть CompositeDisposable для Activity или фрагмента, и вы отписываетесь в onDestroy (или ещё где-то).


Метод subscribeWith существует для всех четырёх типов без поддержки backpressure.


Observable<String> o = Observable.just(“Hello”);
Disposable d2 = o.subscribeWith(new DisposableObserver<String>() { … });

Maybe<String> m = Maybe.just(“Hello”);
Disposable d3 = m.subscribeWith(new DisposableMaybeObserver<String>() { … });

Single<String> s = String.just(“Hello”);
Disposable d4 = s.subscribeWith(new DisposableSingleObserver<String>() { … });

Completable c = Completable.completed();
Disposable d5 = c.subscribeWith(new Disposable Completable Observer<String>() { … });

Для Flowable существует похожий метод subscribeWith, даже несмотря на то, что Flowable передаёт в onSubscribe Subscription, а не Disposable.


Flowable<String> f = Flowable.just("Hello");
Disposable d1 = f.subscribeWith(new DisposableSubscriber<String>() { … });

Так что от всех пяти типов вы получаете обратно Disposable, хотя Flowable немного отличается от остальных.


Можно провести аналогию с ресурсом, файлом, курсором в БД. Вы не откроете файл, не имея способа каким-то образом его закрыть. Открыв курсор в БД, вы в конце концов закроете его. Всегда сохраняйте Disposable при подписке на Observable, чтобы потом отписаться.


Операторы


Операторы позволяют решать три задачи:


  • манипулирование данными или их комбинирование;
  • манипулирование потоками выполнения;
  • манипулирование генерируемыми событиями.

Рассмотрим первые две.
Точно так же, как источники позволяют оборачивать синхронные методы и делать их реактивными, операторы позволяют делать реактивными некие действия. Например, здесь мы применяем метод toUppercase() к строке и получаем новую строку.


String greeting = “Hello”;
String yelling = greeting.toUppercase();

В реактивном мире мы берем Observable и с помощью оператора осуществляем эту операцию.


Observable<String> greeting = Observable.just("Hello");
Observable<String> yelling = greeting.map(s -> s.toUppercase());

В данном случае оператором является map. Он позволяет взять генерируемые данные и применить к ним какую-то операцию, чтобы получить новый тип данных.


Посмотрим на объект User: мы сделали так, чтобы колбэки вызывались в фоновом потоке, и нам приходилось явно передавать данные в основной поток выполнения. На самом деле, это можно сделать с помощью встроенного оператора, причём куда более явным образом.


Observable<User> user = um.getUser();
Observable<User> mainThreadUser = user.observeOn(AndroidSchedulers.mainThread());

Можно сказать: «Хочу в другом потоке наблюдать за генерируемыми этим Observable событиями». При смене потоков выполнения имеет значение очерёдность осуществления этих операций.


По аналогии с observeOn мы можем выбрать поток, в котором будет исполняться Observable.


OkHttpClient client = // …
Request request = // …

Observable<Response> response = Observable.fromCallable(() -> {
    return client.newCall(request).execute();
});

Observable<Response> backgroundResponse = response.subscribeOn(Schedulers.io());

Возьмём сетевой запрос. Он будет выполняться синхронно, но мы не хотим, чтобы это происходило в основном потоке выполнения. Можно применить оператор, который изменит поток (в котором произойдёт подписка), и работа будет происходить в заданном потоке. В примере мы подписываемся, указывая Schedulers.io() — это просто пул потоков. Работа будет выполнена в этом пуле, а затем всем подписавшимся будут разосланы уведомления. Здесь subscribeOn — это оператор, который задаёт поток выполнения работы.


Приятно, что все эти операторы возвращают новые Observable. Ведь это значит, что мы можем их комбинировать и объединять в цепочки. Обычно для этого не используются промежуточные переменные – просто применяем операторы в нужной последовательности. Хотим сделать запрос на выполнение в фоновом потоке. Хотим наблюдать за результатом этого запроса в основном потоке. Хотим преобразовать ответ в строку. Хотим считать эту строку. Здесь важна очерёдность.


OkHttpClient client = // …
Request request = // …

Observable<Response> response = Observable.fromCallable(() -> {
        return client.newCall(request).execute();
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .map(response -> response.body().string()); // NetworkOnMainThread!

Поскольку здесь оператор map применён после observeOn, его выполнение будет осуществляться в основном потоке Android. Но мы не хотим читать HTTP-ответ в основном потоке – мы хотим делать это до того, как перейдём в основной поток.


OkHttpClient client = // …
Request request = // …

Observable<Response> response = Observable.fromCallable(() -> {
        return client.newCall(request).execute();
    })
    .subscribeOn(Schedulers.io())
    .map(response -> response.body().string()); // Ok!
    .observeOn(AndroidSchedulers.mainThread())

Специализация операторов


В RxJava есть операторы, которые берут Observable и возвращают другой тип. Например, оператор first(), который берёт первый элемент из последовательности и возвращает его. В RxJava 1 мы получали Observable, просто генерирующий один элемент. Получалось довольно странно: ведь если у вас есть список элементов, и вы применяете к нему get(0), чтобы получить первый элемент, то вы же не получаете в ответ список, состоящий из одного элемента, – вы получаете скаляр. В RxJava 2 сделано иначе: вызывая оператор first(), гарантированно возвращающий один элемент, вы получите Single.


enter image description here


Если Observable был пустой, то это приведёт к ошибке, потому что Single либо содержит элемент, либо возвращает ошибку.


enter image description here


Есть операторы вроде firstElement(), который возвращает Maybe. Пустой Observable можно смоделировать посредством завершения Maybe без ошибки.


enter image description here


Есть операторы, возвращающие Completable. Так что если вас не интересуют данные, и вы просто ждёте завершения или ошибки, то можно использовать оператор firstElement.


enter image description here


Всё описанное применимо и к Flowable: здесь есть точно такие же операторы, и они возвращают такие же специализированные типы.


В этой таблице приведены некоторые операторы.


enter image description here


В правой верхней части таблицы представлены операторы «сужающего» действия. Например, если вам нужно посчитать количество элементов в последовательности, то счётчик всегда будет иметь одиночное значение, поэтому выбирается более «узкий» тип вроде Single. В левой нижней части таблицы представлены операторы «расширяющего» действия. Например, вы можете взять Single и превратить его в Observable.


Реактивный подход


Если переписывать наш исходный пример с точки зрения реактивного программирования, то можно подписаться на User и сказать: «Хочу получать уведомления в основном потоке выполнения, а затем хочу передать это в UI и отобразить данного пользователя». Этот код будет автоматически выполняться при каждом изменении User’а, и вам будут автоматически показываться сделанные изменения, так что больше не придётся забивать себе этим голову.


um.getUser()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(new DisposableObserver<User>() {
        @Override public void onNext(User user) {
            tv.setText(user.toString());
        }
        @Override public void onComplete() { /* ignored */ }
        @Override public void onError(Throwable t) { /* crash or show */ }
    }));

Но не забывайте, что нужно что-то делать с возвращаемыми Disposable. Мы в мире Android, и нам нужно останавливать выполнение кода при исчезновении Activity. В onDestroy мы избавляемся от Disposables.


// onCreate
disposables.add(um.getUser()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(new DisposableObserver<User>() {
        @Override public void onNext(User user) {
            tv.setText(user.toString());
        }
        @Override public void onComplete() { /* ignored */ }
        @Override public void onError(Throwable t) { /* crash or show */ }
    }));

// onDestroy
disposables.dispose();

Аналогично, когда мы делаем асинхронный запрос на изменение данных, нам нужно, чтобы это произошло в фоновом потоке выполнения. Мы хотим в основном потоке наблюдать за результатом – будет ли выполнение завершено успешно или с ошибкой. И в случае успешного завершения мы могли бы в колбэке заново активировать текстовое поле.


disposables.add(um.setName("Jane Doe")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(new DisposableCompletableObserver() {
        @Override public void onComplete() {
            // success! re-enable editing
        }
        @Override public void onError(Throwable t) {
            // retry or show
        }
    }));

Повторюсь: как вы не открыли бы файл без возможности закрыть его. Не подписывайтесь без сохранения Disposable. Поэтому добавим его в наш список Disposable.


В архитектуру RxJava 2 было внесено фундаментальное изменение, которое особенно ярко проявляется при разработке под Android: теперь создаётся меньше промежуточных объектов. Когда вы создаёте все эти цепочки, каждый вызываемый вами оператор должен возвращать новый Observable, реализующий такое поведение. Вызывая оператор map, вы получаете новый Observable, который берёт старый, выполняет функцию и генерирует новые данные.


Эта схема требует размещения в памяти кучи промежуточных объектов. В RxJava 2 их количество уменьшено. Каждый из операторов создаёт на один объект меньше, накладные расходы при подписке ниже. Система работает быстрее, нам нужно реже собирать мусор, и всё это – без всяких компромиссов API.


Заключение


В RxJava 2 реализована следующая идея: берутся изначально асинхронные вещи — работа с сетью, сама Android, база данных, даже UI — и пишется такой код, который реагирует на изменения в этих источниках, а не пытается справиться с изменениями и самостоятельно управлять состояниями.


Если вы используете RxJava 1, то обратите внимание на проект, позволяющий конвертировать типы. С его помощью вы сможете постепенно обновить свои приложения до RxJava 2.


class RxJavaInterop {
    static <T> Flowable<T> toV2Flowable(rx.Observable<T> o) { … }
    static <T> Observable<T> toV2Observable(rx.Observable<T> o) { … }
    static <T> Maybe<T> toV2Maybe(rx.Single<T> s) { … }
    static <T> Maybe<T> toV2Maybe(rx.Completable c) { … }
    static <T> Single<T> toV2Single(rx.Single<T> s) { … }
    static Completable toV2Completable(rx.Completable c) { … }

    static <T> rx.Observable<T> toV1Observable(Publisher<T> p) { … }
    static <T> rx.Observable<T> toV1Observable(Observable<T> o, …) { … }
    static <T> rx.Single<T> toV1Single(Single<T> o) { … }
    static <T> rx.Single<T> toV1Single(Maybe<T> m) { … }
    static rx.Completable toV1Completable(Completable c) { … }
    static rx.Completable toV1Completable(Maybe<T> m) { … }
}

Реактивное программирование позволяет использовать правильную архитектуру — асинхронную. Приветствуйте асинхронность источников, а не пытайтесь самостоятельно управлять всеми состояниями. Комбинируйте источники так, чтобы ваши приложения стали по-настоящему реактивными.

Поделиться с друзьями
-->

Комментарии (2)


  1. Mihail57
    12.05.2017 03:43
    +7

    Одно из самых понятных введений в RxJava, имхо. Огромное спасибо автору за такой подробный пост.


  1. Valeroncho
    12.05.2017 13:10
    +1

    В статье не написано, что на Observable можно подписаться с помощью

    .subscribe(Consumer<? super T> onNext)
    .subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
    .subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
    

    Каждый из этих методов возвращает Disposable.

    Используя эти методы, мы можем переписать этот код:
    Observable<String> o = Observable.just(“Hello”);
    
    Disposable d = new o.subscribeWith(new DisposableObserver<String>() {
        @Override public void onNext(String s) { … }
        @Override public void onComplete() { … }
        @Override public void onError(Throwable t) { … }
    });
    
    d.dispose();
    

    На такой:
    Observable<String> o = Observable.just("Hello");
    
    Disposable d = o.subscribe(s -> {}, throwable -> {}, () -> {});
            
    d.dispose();