RxJava — это реализация ReactiveX для Java — библиотеки для асинхронной обработки потоков данных. Паттерн observable на стероидах, как они сами пишут. В интернете, в том числе на Хабре, есть много «введений в RxJava». Я хочу привести несколько примеров реальных задач. Они не очень сложные, но возможно кто-то увидит какие-то сходства со своими и задумается.

Собственно, задачи:

1. Простое клиентское TCP-соединение. Есть протокол поверх TCP/IP, нужно сформировать сообщение, подключиться к удаленному узлу, если еще не подключился, передать сообщение и прочитать ответ. Плюс обработка ошибок, проверка таймаутов, повтор отправки в случае неудачи. Жестких требований к производительности нет, трафик не большой.

2. Есть двигатель и некоторый датчик. Нужно произвести сканирование — пройтись двигателем по заданной траектории: послать двигатель к точке, дождаться, когда он к ней приедет, снять показания датчика, отобразить точку на графике (в GUI потоке), поехать к следующей точке…

3. Полученные после сканирования данные нужно обработать (условно длительный вычислительный процесс) и засунуть в pdf-отчет (условно длительный процесс ввода-вывода) вместе с изображением графика и данными введенными пользователем (GUI поток).

1. Простое клиентское TCP-соединение


Предположим, есть некоторый протокол обмена сообщениями. Сообщения могут содержать заголовок, контрольную сумму или что-нибудь еще. На каждое сообщение должен быть ответ от сервера. В простейшем виде решение может выглядеть как-то так:

    public String send(String command) {
        try {
            if (!isConnected()) {
                connect();
            }
            byte[] bytes = command.getBytes();
            bytes = addHeader(bytes);
            sendBytes(bytes);
            return readAnswer();
        } catch (IOException e) {
            // паника
        }
    }

Детали реализации я не описываю, но вкратце: connect() создает java.net.Socket и подключается к серверу, sendBytes() пишет в output-поток сокета, readAnswer() читает из input-потока сокета. Помимо addHeader() могут быть еще методы, добавляющие контрольную сумму, кодирование и прочее.

Проблемы этого кода: блокирующие запись/чтение и неудобная обработка ошибок — не понятно, что делать с исключением: то ли наверх пробрасывать, то ли тут что-то сделать (рекурсивно повторить отправку?). Как раз эти две проблемы и решает RxJava. Перепишем:

    public Observable<String> send(String command) {
        return Observable.just(command)
                .doOnNext(cmd -> checkConnection())
                .map(cmd -> cmd.getBytes())
                .map(bytes -> addHeader(bytes))
                .map(bytes -> sendBytes(bytes))
                .map(result -> readAnswer());
    }

Применение:

    connection.send("echo 123")
                .subscribe(
                        answer -> { /*обработать ответ*/ },
                        throwable -> { /*обработать ошибку*/ }
                );

В общем-то, получилось то же самое, только в виде монады, набора операторов и с рядом нюансов.

Во-первых, метод sendBytes() теперь возвращает boolean. RxJava работает с потоками данных, а если кто-то возвращает void вместо данных, то потока как-бы уже и нет. Поэтому нужно либо добавить возвращаемый результат в метод (хотя бы return true), либо вместо map использовать doOnNext — этот оператор возвращает то же, что и получил.

Во-вторых, метод send() теперь возвращает Observable, а не сам String. Поэтому нужен отдельный обработчик ответа (или лямбда, как в примере). С исключением то же самое. Тут нужно, как говорится, начать мыслить асинхронно. Вместо самого результата, мы получаем объект, который потом когда-нибудь предоставит нам результат, а мы должны предоставить ему того, что этот результат получит. Вот только этот код все еще блокирующий, поэтому это асинхронное мышление не имеет смысла. Можно, правда, сделать обертку для String и вытащить результат из монады через замыкание этой обертки, но это уже грязные хаки, которые нарушают принципы функционального программирования.

Улучшим этот код. Начнем с обработки ошибок. RxJava отлавливает исключения, возникающие в операторах, и передает их подписчику. Второй аргумент метода subscribe() — это функциональный интерфейс Action1 — он как раз и отвечает за обработку исключения. Если какой-то из методов раньше мог кидать IOException или какое-то еще checked исключение, то теперь больше нельзя. Такие исключения нужно ловить руками и что-то с ними делать. Например, оборачивать в RuntimeException, чтобы предоставить дальнейшие решения RxJava. Но Action1 не сильно отличается от обычного try-catch подхода. У RxJava есть операторы для работы с ошибками: doOnError(), onErrorReturn(), onErrorResumeNext() и onExceptionResumeNext(). А еще есть банальный retry(), который тут как раз и нужен. Если возникла какая-то ошибка с подключением, то можно просто повторить отправку n-раз.

    public Observable<String> send(String command) {
        return Observable.just(command)
                .doOnNext(cmd -> checkConnection())
                .map(cmd -> cmd.getBytes())
                .map(bytes -> addHeader(bytes))
                .map(bytes -> sendBytes(bytes))
                .map(result -> readAnswer())
                .doOnError(throwable -> disconnect())
                .retry(MAX_RETRY_COUNT);
    }

Обработчик исключения, передаваемый в subscribe() будет вызван только в том случае, если все повторы закончатся с ошибкой. Для надежности еще вызываем disconnect() перед повторной попыткой, чтобы закрыть и обнулить сокет. Иначе в checkConnection() внутри при вызове isConnected() можем получить ложно положительное срабатывание, и все повторные попытки опять приведут к ошибке. Например, если сервер убил подключение по таймауту, то метод Socket.isConnected() на стороне клиента все еще будет возвращать true — со стороны клиента сокет же подключен, все норм.

Можно еще добавить таймаут на случай, если серверу поплохело, и клиент заблокировался на записи в сокет:

    public Observable<String> send(String command) {
        return Observable.just(command)
                .doOnNext(cmd -> checkConnection())
                .map(cmd -> cmd.getBytes())
                .map(bytes -> addHeader(bytes))
                .map(bytes -> sendBytes(bytes))
                .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS)
                .map(result -> readAnswer())
                .doOnError(throwable -> disconnect())
                .retry(MAX_RETRY_COUNT);
    }

Оператор timeout кидает исключение, если в течение заданного времени от Observable не поступило ни одного элемента. А исключения мы уже умеем обрабатывать.

Теперь вторая проблема — у нас все еще блокирующие операции, поэтому если вызывать send() из потока GUI, то можно получить подвисания интерфейса. Нужно просто сказать RxJava, чтобы все эти действия выполнялись в другом потоке.

Для этого есть операторы observeOn() и subscribeOn(). У многих людей возникают проблемы с пониманием, чем отличаются эти операторы — есть куча статей на эту тему и вопросов на stackoverflow. Давайте вновь поднимем эту тему и вместе подумаем, что нам нужно сейчас использовать. Вот что пишут в официальной документации:

SubscribeOn — specify the Scheduler on which an Observable will operate.
ObserveOn — specify the Scheduler on which an observer will observe this Observable.

Observable — это тот, кто поставляет данные. Observer — это тот, кто получает данные и что-то с ними делает. Нам нужно, чтобы все выполнялось в другом потоке. Вернее, нам нужно, чтобы наш Observable поставлял данные изначально в другом потоке. А раз данные поставляются в другом потоке, то и все observer'ы будут их обрабатывать в другом потоке. Это по определению subscribeOn() — он определяет планировщика для Observable, которого мы создали в самом начале:

    public Observable<String> send(String command) {
        return Observable.just(command)
                .doOnNext(cmd -> checkConnection())
                .map(cmd -> cmd.getBytes())
                .map(bytes -> addHeader(bytes))
                .map(bytes -> sendBytes(bytes))
                .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS)
                .map(result -> readAnswer())
                .doOnError(throwable -> disconnect())
                .retry(MAX_RETRY_COUNT)
                .subscribeOn(Schedulers.io());
    }

Теперь операторы будут выполняться в потоке, который им предоставит планировщик io. Если несколько раз подряд вызвать send(), не дожидаясь ответа, то могут возникнуть проблемы с синхронизацией. По-хорошему, функции переданные в операторы, должны быть чистыми (без побочных эффектов), но в случае с сокетом это проблематично. Чистые функции вообще не очень дружат с вводом/выводом. Нужно синхронизировать обращения к сокету или реализовать что-то типа ConnectionPool'а — тут нужно исходить из задачи.

Стоит иметь в виду, что тогда и обработка ответа подписчиком (он тоже observer) будет осуществляться в другом потоке, а это не всегда хорошо. Например, если мы хотим отобразить ответ в графическом интерфейсе, то скорее всего получим исключение, что мы это делаем не в главном потоке. Для этого нужно поместить обработчик в очередь событий фреймворка, отвечающего за графический интерфейс. В разных фреймворках это делается по-разному. В JavaFX для этого есть метод Platform.runLater(runnable). Можно вызывать его напрямую в обработчике ответа, а можно написать свой планировщик:

public final class FxScheduler extends Scheduler {

    private final static FxScheduler m_instance = new FxScheduler();

    private FxScheduler() {}

    public static FxScheduler getInstance() {
        return m_instance;
    }

    @Override
    public Worker createWorker() {

        return new Worker() {

            private final CompositeSubscription m_subscription = new CompositeSubscription();

            @Override
            public Subscription schedule(Action0 action0) {
                Platform.runLater(action0::call);
                return m_subscription;
            }

            @Override
            public Subscription schedule(Action0 action0, long delay, TimeUnit timeUnit) {
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        Platform.runLater(action0::call);
                    }
                }, timeUnit.toMillis(delay));
                return m_subscription;
            }

            @Override
            public void unsubscribe() {
                m_subscription.unsubscribe();
            }

            @Override
            public boolean isUnsubscribed() {
                return m_subscription.isUnsubscribed();
            }
        };
    }
}

К слову, для Android существует AndroidSchedulers.mainThread() в RxAndroid — дополнении для RxJava. Пример отправки команды тогда имеет вид:

        send("echo 123")
                .observeOn(FxScheduler.getInstance())
                .subscribe(
                        answer -> { /*обработать ответ*/ },
                        throwable -> { /*обработать ошибку*/ }
                );

Здесь мы используем уже observeOn() — нам нужно сообщить RxJava, что «следующий observer должен выполняться через такой-то планировщик».

RxJava предоставляет удобное управление конвейером операторов. Рядом с .map(bytes -> sendBytes(bytes)) можно добавить расчет контрольной суммы, потом прогнать байты через кодирование. Можно добавить вначале логгирование исходящей команды, а в конце — полученного ответа. В общем, идею вы поняли.

2. Сканирование с помощью двигателя и датчика


Есть набор точек — это могут быть углы поворота двигателя в градусах или позиции устройства, которое приводится в движение двигателем. В общем, есть какой-то актуатор. А еще есть внешний датчик, с которого можно получать значения. Нужно проехаться двигателем по набору точек, в каждой точке получить значение с датчика, построить кривую на графике. Повторить процедуру n раз (n кривых на графике). При этом двигатель работает не мгновенно, нужно ждать, когда он выйдет на позицию.

Итак, мы имеем набор точек, для каждой нужно что-то сделать (желательно в другом потоке), а результат обрабатывается в GUI потоке (добавить точку на LineChart, например). Похоже на типичную задачу для RxJava.

    public Observable<Point> startScan(List<Double> trajectory, int iterationCount) {
        return Observable.from(trajectory)
                .subscribeOn(Schedulers.io())
                .doOnNext(this::moveMotor)
                .doOnNext(this::blockUntilTargetReached)
                .map(this::createResultPoint)
                .repeat(iterationCount);
    }

Используем Schedulers.io(): управление двигателем и датчиком — это все-таки операции ввода-вывода. moveMotor() посылает команду двигателю (через написанный ранее Connection, например).

blockUntilTargetReached() запрашивает у двигателя его позицию, сравнивает с целевой и усыпляет поток на сколько-то миллисекунд, если двигатель еще не доехал. createResultPoint() запрашивает у датчика значение в возвращает объект класса Point, содержащий пару чисел — целевую позицию и значение с датчика. repeat() работает почти как retry() — он повторяет весь поток с самого начала каждый раз, а retry() только после ошибки.

Исходный Observable будет выдавать точки по одной. Следующую точку он выдаст только когда предыдущая пройдет все операторы вплоть до подписчика. Это соответствует функциональному подходу с его ленивыми вычислениями и потоковой обработкой. Таким же образом работают StreamAPI и LINQ. За счет этого сканирование будет идти по точкам по очереди, а не forEach(this::moveMotor), затем forEach(this::blockUntilTargetReached) и так далее.

Применение:

        final List<Double> trajectory = ...;
        final int n = ...;
        startScan(trajectory, n)
                .observeOn(FxScheduler.getInstance())
                .subscribe(
                        point -> processPoint(point),
                        throwable -> processError(throwable),
                        () -> processData()
                );

Проблема в том, что подписчик не отличает на каком именно повторе была получена точка. То есть вместо n кривых, мы получим одну кривую в n раз длиннее. Нужно как-то вручную отслеживать, что началось новое сканирование. Например, считать количество точек и начинать новую кривую, если значение счетчика превысило количество точек в траектории. Или сравнивать пришедшую точку с первой точкой траектории.

В subscribe() появился третий аргумент — это обработчик onComplete(), который вызывается, когда в Observable закончились элементы.

subscribe() возвращает объект, имеющий интерфейс Subscription. Если вызвать у него метод unsubscribe(), то у Observable больше не будет подписчика, принимающего данные, поэтому он просто перестанет их выдавать. Принцип ленивых вычислений — если данные никому не нужны, то не нужно их передавать. Побочных эффектов у операторов все равно не должно быть в соответствии с парадигмой функционального программирования, поэтому просто выполнять операторы без подписчика у Observable смысла нет. С помощью unsubscribe() можно реализовать отмену сканирования. Разве что еще двигателю нужно послать команду на останов движения — за это unsubscribe() не отвечает.

3. Обработка данных и отчет


Мы получили после сканирования много полезных данных, теперь их нужно обработать, рассчитать нужные значения и сгенерировать pdf-отчет.

В отчете так же должны быть значения некоторых полей из интерфейса (например, ФИО пользователя) и рисунок полученных графиков. В случае JavaFX рисунок можно получить методом snapshot(), который есть у каждого графического объекта. Так как это действия с объектами JavaFX, выполняться они должны в GUI-потоке. Для этого у нас уже есть FxScheduler.

            class ReportMetaInfo {
                private String fileName;
                private String name;
                private WritableImage image;
            }
            final Observable<ReportMetaInfo> reportGuiData = Observable.just(m_reportInfoProvider)
                    .subscribeOn(FxScheduler.getInstance())
                    .map(provider -> {
                        final ReportMetaInfo info = new ReportMetaInfo();
                        info.fileName = m_reportInfoProvider.getFileName();
                        info.name = m_reportInfoProvider.getName();
                        info.image = m_reportInfoProvider.getChartSnapshot();
                        return info;
                    });

m_reportInfoProvider — это реализация интерфейса ReportInfoProvider — прослойки между моделью и представлением. По сути это вызов геттеров из TextView, но модели все равно — у нее просто интерфейс.

Для расчетов есть Schedulers.computation().

            final Observable<ScanResult> reportComputationalData = Observable.just(scanData)
                    .subscribeOn(Schedulers.computation())
                    .map(data -> new ResultProcessor(data).calculateAll());

Теперь мы хотим объединить данные из формы и данные из расчетов и поместить все это в тяжелый pdf-файл. Для этого есть оператор zip() и Schedulers.io():

            class ReportData {
                ReportMetaInfo metaInfo;
                ScanResult result;

                ReportData(ReportMetaInfo metaInfo, ScanResult result) {
                    this.metaInfo = metaInfo;
                    this.result = result;
                }
            }
            Observable.zip(
                    reportGuiData,
                    reportComputationalData,
                    (reportInfo, scanResult) -> new ReportData(reportInfo, scanResult)
            )
                    .observeOn(Schedulers.io())
                    .map(reportData -> ReportGenerator.createPdf(
                            reportData.metaInfo.fileName,
                            reportData.metaInfo.name,
                            reportData.metaInfo.image,
                            reportData.result
                    )).subscribe(
                    isOk -> { /* здесь, в общем-то, делать нечего */ },
                    throwable -> { /* что-то пошло не так */ },
                    () -> { /* здесь мы окажемся, если все прошло успешно */ }
            );

zip() принимает до девяти разных Observable и соединяет элементы из них в кортежи. Функцию для соединения нужно предоставить самому как и результирующий тип для кортежей. В итоге получение данных из интерфейса (включая изображение графика) и обработка результатов сканирования проходят параллельно. Нужно ли распараллеливание таких действий, зависит от конкретных задач и объемов данных — я привел несколько упрощенный пример.

Стоит иметь в виду, что когда у нас несколько потоков данных, может проявляться backpressure. Это различные проблемы связанные с разной производительностью потоков и разной производительностью Observable и Observer. В общем, это ситуации, когда кто-то простаивает, а у кого-то уже через буфер переливается. Так что нужно быть аккуратным.

Заключение


Скорее всего для этих задач есть другие решения (и более эффективные) — если кто-то мне их укажет, я с радостью приму это к сведению и учту в работе. На примере этих задач я постарался показать некоторые особенности RxJava: обработка ошибок, отличие subscribeOn() и observeOn(), кастомные планировщики и получение результата в GUI-потоке, принцип ленивых вычислений и его применение для управления внешними устройствами, прерывание работы Observable, параллельная работа нескольких Observable и их объединение. Так что даже если эти задачи не совсем удачные для RxJava, сами рассмотренные принципы могут быть полезны для других.
Поделиться с друзьями
-->

Комментарии (3)


  1. Moxa
    04.09.2016 13:46
    +1

    я правильно понимаю, что вызов send в первой задаче неблокирующий? что произойдет, если отправить одно сообщение и, не дожидаясь результата, отправить следующее?


    1. fck_r_sns
      04.09.2016 14:42

      Да, прошу прощения, забыл про это написать. Могут появиться различные concurrency-проблемы. Нужно либо доступ к сокету синхронизировать, либо пул сокетов использовать — от задачи зависит.
      Добавил в статье абзац про это.


  1. AIring
    04.09.2016 15:09

    Хотелось бы добавить, что для JavaFX уже существует официальный набор привязок к RxJava, в том числе планировщик (scheduler).