В вводной статье мы рассмотрели преимущества реактивного подхода в программировании на Java, а также ситуации в которых библиотека Rx бывает более или менее полезной. В этой части мы рассмотрим основные типы, на которых базируется концепция реактивного программирования и несколько дополнительных классов, которые помогут нам в понимании принципов работы библиотеки.
Содержание:
- Часть первая – Вступление
- Почему Rx?
- Ключевые типы
- Жизненный цикл Observable
- Часть вторая – Последовательности
- Создание последовательности
- Фильтрация последовательности
- Исследование
- Агрегация
- Трансформация последовательностей
- Часть третья – Управление последовательностями
- Часть четвертая – Параллельность
Ключевые типы
Rx базируется на двух фундаментальных типах, в то время, как некоторые другие расширяют их функциональность. Этими базовыми типами являются Observable и Observer, которые мы и рассмотрим в этом разделе. Мы также рассмотрим Subject’ы – они помогут в понимании основных концепций Rx.
Rx построена на паттерне Observer. В этом нет ничего нового, обработчики событий уже существуют в Java (например, JavaFX EventHandler[1]), однако они проигрывают в сравнении с Rx по следующим причинам:
- Обработку событий в них сложно компоновать
- Их вызов нельзя отложить
- Могут привести к утечке памяти
- Не существует простого способа сообщить об окончании потока событий
- Требуют ручного управления многопоточностью.
Observable
Observable – первый базовый тип, который мы рассмотрим. Этот класс содержит в себе основную часть реализации Rx, включая все базовые операторы. Мы рассмотрим их позже, а пока нам следует понять принцип работы метода subscribe. Вот ключевая перегрузка [2]:
public final Subscription subscribe(Observer<? super T> observer)
Метод subscribe используется для получения данных выдаваемых [3] observable. Эти данные передаются наблюдателю, который предполагает их обработку в зависимости от требований потребителя. Наблюдатель в этом случае является реализацией интерфейса Observer.
Observable сообщает три вида событий:
- Данные
- Сигнал о завершении последовательности [4] (что означает, что новых данных больше не будет)
- Ошибку, если последовательность завершилась по причине исключительной ситуации (это событие так же предполагает завершение последовательности)
Observer
В Rx предусмотрена абстрактная реализация Observer, Subscriber. Subscriber реализует дополнительную функциональность и, как правило, именно его следует использовать для реализации Observer. Однако, для начала, рассмотрим только интерфейс:
interface Observer<T> {
void onCompleted();
void onError(java.lang.Throwable e);
void onNext(T t);
}
Эти три метода являются поведением, которое описывает реакцию наблюдателя на сообщение от observable. onNext у наблюдателя будет вызван 0 или более раз, опционально сопровождаясь onCompleted или onError. После них вызовов больше не будет.
Разрабатывая код с помощью Rx, вы увидите много Observable, но намного меньше Observer. И хотя и необходимо понимать концепцию Observer, существуют способы не требующие непосредственного создания его экземпляра.
Реализация Observable и Observer
Вы можете вручную реализовать Observer и Observable. В реальности в этом, как правило, нет необходимости: Rx предоставляет готовые решения, чтобы упростить разработку. Это также может быть не совсем безопасно, поскольку взаимодействие между частями библиотеки Rx включает в себя принципы и внутреннюю инфраструктуру, которые могут быть не очевидны новичку. В любом случае, будет проще для начала использовать множество инструментов уже предоставленных библиотекой для создания необходимого нам функционала.
Чтобы подписаться на observable, совсем нет необходимости в реализации Observer. Существуют другие перегрузки метода subscribe, которые принимают в качестве аргументов соответствующие функции для onNext, onError и onSubscribe, инкапсулирующие создание экземпляра Observer. Предоставлять их всех тоже не обязательно, вы можете описать только часть из них, например, только onNext или только onNext и onError.
Лямбда-выражения в Java 1.8 делают эти перегрузки очень подходящими для использования в коротких примерах этой серии статей.
Subject
Subject’ы являются расширением Observable, одновременно реализуя интерфейс Observer. Идея может показаться странной, но в определенных случаях они делают некоторые вещи намного проще. Они могут принимать сообщения о событиях (как observer) и сообщать о них своим подписчикам (как observable). Это делает их идеальной отправной точкой для знакомства с Rx кодом: когда у вас есть данные, поступающие извне, вы можете передать их в Subject, превращая их таким образом в observable.
Существует несколько реализаций Subject. Сейчас мы рассмотрим самые важные из них.
PublishSubject
PublishSubject – самая простая реализация Subject. Когда данные передаются в PublishSubject, он выдает их всем подписчикам, которые подписаны на него в данный момент.
public static void main(String[] args) {
PublishSubject<Integer> subject = PublishSubject.create();
subject.onNext(1);
subject.subscribe(System.out::println);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
}
Вывод:
2
3
4
Как мы видим, 1 не была напечатана из-за того, что мы не были подписаны в момент когда она была передана. После того как мы подписались, мы начали получать все значения поступающие в subject.
Здесь мы впервые используем метод subscribe, так что стоит уделить этому внимание. В данном случае мы используем перегруженную версию, которая принимает один объект класса Function, отвечающий за onNext. Эта функция принимает значение типа Integer и ничего не возвращает. Функции, которые ничего не возвращают также называются actions. Мы можем передать эту функцию следующими способами:
- Предоставить объект класса Action1<Integer>
- Неявно создать таковой используя лямбда-выражение
- Передать ссылку на существующий метод с соответствующей сигнатурой. В данном случае, System.out::println имеет перегруженную версию, которая принимает Object, поэтому мы передаем ссылку на него. Таким образом, подписка позволяет нам печатать в основной поток вывода все поступающие в Subject числа.
ReplaySubject
ReplaySubject имеет специальную возможность кэшировать все поступившие в него данные. Когда у него появляется новый подписчик, последовательность выдана ему начиная с начала. Все последующие поступившие данные будут выдаваться подписчикам как обычно.
ReplaySubject<Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println("Early:" + v));
s.onNext(0);
s.onNext(1);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(2);
Вывод
Early:0
Early:1
Late: 0
Late: 1
Early:2
Late: 2
Все значения были получены, не смотря на то, что один из подписчиков подписался позже другого. Обратите внимание, что до того как получить новое значение, подписчик получает все пропущенные. Таким образом, порядок последовательности для подписчика не нарушен.
Кэшировать всё подряд не всегда лучшая идея, так как последовательности могут быть длинными или даже бесконечными. Фабричный метод ReplaySubject.createWithSize ограничивает размер буфера, а ReplaySubject.createWithTime время, которое объекты будут оставаться в кеше.
ReplaySubject<Integer> s = ReplaySubject.createWithSize(2);
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
Вывод
Late: 1
Late: 2
Late: 3
Наш подписчик на этот раз пропустил первое значение, которое выпало из буфера размером 2. Таким же образом со временем из буфера выпадают объекты у
Subject созданного при помощи createWithTime.
ReplaySubject<Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS, Schedulers.immediate());
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(100);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
Вывод
Late: 1
Late: 2
Late: 3
Создание ReplaySubject с ограничением по времени требует объект планировщика (Scheduler), который является представлением времени в Rx. Мы обязательно вернемся к планировщикам в разделе про многопоточность.
ReplaySubject.createWithTimeAndSize ограничивает буфер по обоим параметрам.
BehaviorSubject
BehaviorSubject хранит только последнее значение. Это то же самое, что и ReplaySubject, но с буфером размером 1. Во время создания ему может быть присвоено начальное значение, таким образом гарантируя, что данные всегда будут доступны новым подписчикам.
BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
Вывод
Late: 2
Late: 3
Начальное значение предоставляется для того, чтобы быть доступным еще до поступления данных.
BehaviorSubject<Integer> s = BehaviorSubject.create(0);
s.subscribe(v -> System.out.println(v));
s.onNext(1);
Вывод
0
1
Так как роль BehaviorSubject – всегда иметь доступные данные, считается неправильным создавать его без начального значения, также как и завершать его.
AsyncSubject
AsyncSubject также хранит последнее значение. Разница в том, что он не выдает данных до тех пока не завершится последовательность. Его используют, когда нужно выдать единое значение и тут же завершиться.
AsyncSubject<Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
Вывод
2
Обратите внимание, что если бы мы не вызвали s.onCompleted(), этот код ничего бы не напечатал.
Неявная инфраструктура
Как мы уже упоминали, существуют принципы, которые могут быть не очевидны в коде. Один из важнейших заключается в том, что ни одно событие не будет выдано после того, как последовательность завершена (onError или onCompleted). Реализация subject’ уважает эти принципы:
Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);
Вывод
0
Безопасность не может быть гарантирована везде, где используется Rx, поэтому вам лучше быть осведомленным и не нарушать этот принцип, так как это может привести к неопределенным последствиям.
В продолжении мы рассмотрим жизненный цикл Observable.
[1] Или знакомые всем Event Listeners. – Примеч. Автора
[2] Я, все-таки считаю, что ключевой перегрузкой тут является именно версия с Observer в качестве аргумента, в оригинале в качестве примера приводится версия subscribe(Subscriber<? super T> subscriber) – Примеч. Автора
[3] Я буду использовать слово «выдавать» чтобы описать событие передачи данных от Observable Observer’у (to emit в ориг.). – Примеч. Автора
[4] Автор использует термин последовательность (sequence), чтобы обозначить множество всех данных, которые может выдать Observable. – Примеч. Автора