Теперь, когда мы понимаем основные принципы Rx, настало время научиться создавать и управлять последовательностями. Стиль управления последовательностями был позаимствован у оригинального C# LINQ, который в свою очередь был вдохновлен функциональным программироанием. Мы поделим все операции по темам, которые отсортированы в порядке возрастания сложности операций. Большинство операторов Rx управляют уже существующими последовательностями, но для начала мы научимся их создавать.
Содержание:
- Часть первая – Вступление
- Часть вторая – Последовательности
- Создание последовательности
- Фильтрация последовательности
- Исследование
- Агрегация
- Трансформация
- Часть третья – Управление последовательностями
- Часть четвертая – Параллельность
Часть 2 — Основы последовательностей
Создание последовательности
Ранее, мы использовали Subject
'ы и вручную подавали на них значения чтобы создать последовательность. Мы делали так чтобы продемонстрировать некоторые ключевые моменты, в том числе основной Rx метод subscribe
. В большинстве случаев Subject
это не лучший способ создать новый Observable
. В этом разделе мы рассмотрим более элегантные способы сделать это.
Простые фабричные методы
Observable.just
just
создает Observable
, который выдаст определенное заранее количество значений, после чего завершится.
Observable<String> values = Observable.just("one", "two", "three");
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: one
Received: two
Received: three
Completed
Observable.empty
Этот Observable
выдаст только событие onCompleted
и больше ничего.
Observable<String> values = Observable.empty();
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Completed
Observable.never
Этот Observable
никогда ничего не выдаст.
Observable<String> values = Observable.never();
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Код выше ничего не напечатает. Но это не означает что программа блокируется. По-факту она просто мгновенно завершается.
Observable.error
Этот Observable
выдаст событие onError и завершится.
Observable<String> values = Observable.error(new Exception("Oops"));
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Error: java.lang.Exception: Oops
Observable.defer
defer
не создает новый Observable
, но позволяет определить каким образом Observable
будет создан при появлении подписчиков. Подумайте, как бы вы создали Observable
который будет выдавать текущее время? Так как значение только одно, похоже, что здесь нам может помочь just
.
Observable<Long> now = Observable.just(System.currentTimeMillis());
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);
1431443908375
1431443908375
Обратите внимание как второй подписчик, подписавшись на секунду позже, получил такое же время. Это происходит потому что значение времени было вычислено лишь единажды: когда выполнение доходит до метода just
. Однако в нашем случае мы хотим вычислять текущее время при каждой подписке. defer
принимает функцию, которая возвращает Observable
и будет выполнена для каждого нового подписчика.
Observable<Long> now = Observable.defer(() ->
Observable.just(System.currentTimeMillis()));
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);
1431444107854
1431444108858
Observable.create
create
это очень мощный метод создания Observable
.
static <T> Observable<T> create(Observable.OnSubscribe<T> f)
Все намного проще чем выглядит. Внутри всего лишь функция, которая принимает Subscriber
для типа T
. Внутри нее мы можем вручную определить события, которые будут выдаваться подписчику.
Observable<String> values = Observable.create(o -> {
o.onNext("Hello");
o.onCompleted();
});
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: Hello
Completed
Когда кто-нибудь подпишется на наш Observable
(в данном случае values
), соответствующий экземпляр Subscriber
будет передан в функцию create
. По мере выполнения кода, значения будут переданы подписчику. Следует обратить внимание, что нужно самостоятельно вызывать метод onCompleted
чтобы просигнализировать окончание последовательности.
Данный метод является рекомендуемым способом создания Observable
в случае если ни один из других способов не подходит. Это похоже на то, как мы создавали Subject
и вручную подавали на него значения, однако есть несколько важных отличий. В первую очередь, источних событий аккуратно инкапсулирован и отделен от другого кода. Во-вторых, Subject
'ы имеют неочевидные опасности: любой кто имеет доступ к обьекту сможет изменить последовательность. Мы еще вернемся к этой проблеме позже.
Еще одним ключевым отличием от использования Subject
является то, что код выполняется "lazily", только тогда когда прибывает новый подписчик. В примере выше, код выполняется не в момент создания Observable
(так как подписчиков еще нет), а в момент вызова метода subscribe
. Это означает, что значения будет вычислены заново для каждого подписчика, как в ReplaySubject
. Конечный результат похож на ReplaySubject
, за исключением кеширования. С помощью create
мы также можем легко перенести выполнение в отделный поток, в то время как с ReplaySubject
нам приходилось бы вручную создавать потоки для вычисления значений. Мы еще рассмотрим способы сделать выполнение метода onSubscribe
параллельным.
Вы уже могли заметить что любой из предыдущих Observable
можно реализовать при помощи Observable.create
. Наш пример для create
эквивалентен Observable.just("hello")
.
Функциональные методы
В функциональном программировании обычным делом является создание бесконечных последовательностей.
Observable.range
Простой и знакомый функциональным программистам метод. Выдает значения из заданного диапазона.
Observable<Integer> values = Observable.range(10, 15);
Этот пример последовательно выдает значения от 10 до 24.
Observable.interval
Эта функция создаст бесконечную последовательность значений, отделенных заданным интервалом времени.
Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
System.in.read();
Received: 0
Received: 1
Received: 2
Received: 3
...
Последовательность не завершится до тех пор пока мы не отпишемся.
Следует обратить внимание почему блокирующий ввод в конце примера обязателен. Без него программа завершится ничего не напечатав. Это происходит потому, что все наши операции являются неблокирующими: мы создаем периодически выдающий значения Observable
, затем регистрируем подписчика, который выполняет некоторые действия в момент прибытия этих значений. Ничто из этого не блокирует главный поток от завершения.
Observable.timer
Существует две перегрузки Observable.timer
. Первый вариант создает Observable
выдающий 0L
через заданный промежуток времени.
Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: 0
Completed
Второй вариант ожидает заданный промежуток времени, затем начинает выдавать значения так же как interval
с заданной частотой.
Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: 0
Received: 1
Received: 2
...
Пример выше ждет 2 секунды, затем начинает считать каждую секунду.
Превращение в Observable
В java существуют инструменты для работы с последовательностями, коллекциями и асинхронными событиями, которые могут не иметь прямой совместимости с Rx. Сейчас мы рассмотрим каким образом можно превратить их во входящие данные вашего Rx кода.
Если вы используете EventHandler'ы, то с помощь Observable.create
из событий можно создать последовательность.
Observable<ActionEvent> events = Observable.create(o -> {
button2.setOnAction(new EventHandler<ActionEvent>() {
@Override public void handle(ActionEvent e) {
o.onNext(e)
}
});
})
В зависимости от конкретного события, его тип (в данном случае ActionEvent
) сам по себе может нести достаточно информации чтобы стать типом вашего Observable
. Однако, очень часто вам может понадобиться что-нибудь другое, например, значение некого поля в момент события. Получить значение такого поля лучше всего внутри хендлера, пока UI поток заблокирован и значения поля актуально. И хотя не существует гарантий, что значение останется неизменным до достижения конечного подписчика, в правильно реализованном Rx коде изменения контролируются на стороне потребителя [1].
Observable.from
Вы можете превратить любые входные данные в Observable
при помощи create
. Однако, для распространенных типов данных, существуют уже готовые методы, призванные облегчить этот процесс.
Future
'ы являются частью Java и вы должно быть сталкивались с ними во время работы с фреймворками использующими многопоточность. Они являются менее мощным многопоточным инструментом чем Rx, так как возвращают только одно значение. Как правило, вы захотите превратить их в Observable
.
FutureTask<Integer> f = new FutureTask<Integer>(() -> {
Thread.sleep(2000);
return 21;
});
new Thread(f).start();
Observable<Integer> values = Observable.from(f);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: 21
Completed
Observable
выдает результат FutureTask
по-готовности, после чего завершается. Если задача была отменена, observable выдаст ошибку java.util.concurrent.CancellationException
.
Если вы заинтересованы в результате Future
только ограниченное время, существует возможность задать таймаут в качестве аргумента.
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
Если за это время Future
не завершится, observable проигнорирует результат и выдаст TimeoutException
.
С помощью Observable.from
можно превратить любую коллекцию в последовательность. Будет создан Observable
, выдающий каждый элемент коллекции по-отдельности и onCompleted
в конце.
Integer[] is = {1,2,3};
Observable<Integer> values = Observable.from(is);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: 1
Received: 2
Received: 3
Completed
Observable
это не то же что Iterable
или Stream
. Observable
push-ориентированный, в том смысле, что вызов onNext
провоцирует стек обработчиков выполниться вплоть до последнего subscribe
метода. Остальные модели pull-ориентированные — значения в них запрашиваются с другой стороны и выполнение блокируется до возвращения результата.
[1] consumer, тот, кто поглащает значения выданные Observable
Теперь у проекта есть свой публичный репозиторий и любой желающий может присоединится к созданию углубленного русскоязычного туториала по Rx. Перевод этой части уже там, остальные появятся в скором времени, а с вашей помощью, еще быстрее.
Fen1kz
Таких введений полно, а отличие от титориалов, скажем, как сделать последовательность типа:
Ждем логина, ждем интернета, открываем базу, берем из базы список несинхронизированных данных, закачиваем их, закрываем базу.
Тонна подводных камней в стиле "если интернет моргнет 2 раза — мы загрузим несинхронизированные данные 2 раза?"
Или "как закрыть базу если от событий интернета у нас идет поток без onCompleted" и пр.
bolein95
Когда закончу с циклом статей, обязательно напишу для вас подобный туториал :)
lair
А зачем такую «последовательность» вообще делать на Rx? Это же не поток событий, это обычный процесс.
Fen1kz
логин это событие, статус интернета это вообще поток событий. Или вы все данные будете в одном потоке загружать?
Назовите пример "потока событий", а не обычного процесса.
MaximChistov
что мешает просто выполнить их в фоне? ни один из описанных этапов не требует действий пользователя
lair
В разных, но зачем мне для этого обязательно Rx?
Вы сам его назвали — изменение статуса подключения к интернету. Еще типовые примеры — нажатия клавиш клавиатуры, биржевые ставки и так далее.
i_user
Да в общем-то низачем. RX он даже не про асинхронность, а про chaining — это способ консистентно и декларативно описать разнородную последовательность операций. Когда у тебя поток шлет лишь один value — это превращается в обычную future.
Иметь консистентный способ проводить преобразования этих самых фьюч и не заботиться о том, моментальная эта операция или растянутая во времени, синхронная или асинхронная.
Концепция биндинга экономит много времени на кофе и комментарии на хабре :-)
А достигнуть сходного качества организации кода можно и без RX.
В чем плюс RX в этом смысле — что когда появятся более настоящие RX задачи — они в строятся в остальной код таким образом, что никто этого и не заметит
lair
Чтобы «не заботиться о том, моментальная эта операция или растянутая во времени, синхронная или асинхронная» достаточно работать с любой операцией как с future, и если ваш язык поддерживает async workflows, у вас все легко и хорошо. А вот смешивать одиночные и множественные операции уже совсем не так полезно для прозрачности кода, к сожалению.
i_user
Ну вот здесь на самом деле вопрос — что такое одиночная операция.
Кнопка, скажем — не одиночная операция — а сигнал нажатий на нее
Логин — не одиночная операция — любой запрос тебе может вернуть 403, и ты зарелогинишься, вызвав при этом какой-то навешенный на это флоу.
И так далее.
Вообще с точки зрения реактивщины — почти все — стримы, сиречь множественные операции — а что одиночные — в общем-то и не стоит отдельного эффорта.
Пы-сы. Да, если ваш язык поддерживает async workflows… Ну мой язык, скажем, не поддерживает) и опять же с точки зрения реактивного программировния и прочих биндингов — все эти асинки-авэйты лишь частный случай преобразований control/data flow — и этим прекрасны.
Вообще из всех моих личных комплейнтов к реактивщине — она всегда читабельна. Но иногда несет избыточный бойлер с потерей производительности.
lair
А надо ли вызывать этот флоу, или он на самом деле актуален только для первого логина, а релогин — это совсем другое?
С точки зрения молотка все — гвозди. И да, так можно жить, но это не всегда удобно.
Хм, можете показать пример чистого Rx, описывающий «последовательность» из стартового коммента — мне интересно, насколько это будет читабельно.
Или вы под читабельностью понимаете то же самое, что и многие другие (включая меня, кстати): «я легко и бегло это читаю, потому что я привык»?
i_user
Нет, не надо — это разные задачи. Которые можно описать единообразно.
И это правда. И тут имеет место tradeoff — консистентность против сиюминутной выгоды. На текущем проекте на данный момент первое выигрывает — но у меня нет оснований говорить что так будет всегда
Вот кстати можно написать) Как раз завтра делаю доклад на тему того, как весь набор действий приложения инициируемых какой-нибудь кнопкой может быть завернут в единый реактивный пайплайн. Могу предложить Вам презентацию оного, вроде как он затрагивает более злые примеры, чем в статье, возможно это поможет ответить на вопрос
Вот завтра как раз на докладе и узнаю :-) Простой ли это код, или просто привычный)
lair
Вот и возникает вопрос — не становится ли эта единообразность избыточной, когда вы пытаетесь единообразно описать разные вещи?
А вы уверены, что речь идет о «против сиюминутной выгоды», а не «против семантической точности», скажем?
Давайте, это интересно.
i_user
Ну вообще практика показывает — что правильно приготовленная реактивщина очень и очень семантически точна, но вопрос очень уместный — потому что правильно ее приготовить не очень просто. Насчет избыточности? Возникало такое ощущение, да, но в итоге пришел к тому, что это приемлемая цена за тестируемый код, с гарантированно меньшим количеством сайд-эффектов и высоким уровнем инкапсуляции
К сожалению, презентация заточена под устный доклад — поэтому возможно чего-то будет не хватать — но с удовольствием объясню все недостающее при желании узнать. Ссылко
lair
Так может быть, если есть более простые способы достигнуть семантической точности (я сейчас не буду обсуждать семантическую точность полученной реактивщины, ее надо почитать для этого на конкретных примерах), не надо усложнять?
Мне вот интересно, откуда вы берете это «гарантированно меньшее количество сайд-эффектов», и, что важнее, с чем вы сравниваете.
i_user
Да — вы правы — тут уже надо брать куски кода и разговаривать по ним. С чем сравниваю? Ну, скажем, со «средним по
больнице» iOS репозиториям на гитхабе, с кодом в статьях и так далее, да я полагаю — что реактивный подход действительно хорошо справляется с задачей честного описания стейта сущности (включая транзитный) даже на уровне сигнатур.lair
«Хорошо» или «лучше всех»? Когда речь идет именно об описании состояния — чем потоки RX выгоднее, скажем, чем акковские акторы?
i_user
Хорошо. Странно говорить «Лучше всех» в IT — где каждое решение — компромисс))
Разные классы сущностей. Потоки RX — не про потоки и асинхронность — а про контракт монадного биндинга в первую очередь — то есть про декларативное описание последовательности действий.
Акторы больше похожи на аспектный подход.
Их странно сравнивать) Но если очень хочется — то по моему опыту. аспекты гораздо гибче и дают больший эффект при меньших вложениях сил — однако больше тяготеют к потере контроля над качеством кодовой базы.
lair
Вот именно, что компромис. И поэтому осмысленно понимать, что находится на разных его (компромиса) плечах, и какие есть альтернативы в каждый момент.
Почему же? И то, и другое — способы борьбы с проблемами, возникающими при асинхронии (более того, в одном малоизвестном курсе это даже дают одно после другого).
… а выше вы же писали, что чтобы реактивщина была семантически точной, ее нужно правильно готовить, что не очень просто.
i_user
Могу раскрыть мысль — выработка хорошего подхода при работе с реактивным кодом требует существенных затрат в начале. И Вообще имеет высокий порог входа. Однако после преодоления этого порога оказалось, что в итоговом коде нет проблем, которые возникали при аспектном подходе (хотя я не исключаю, что просто неправильно его готовил)
На мой взгляд эти два подхода просто решают различные задачи, хотя оба лежат в области стиля организации кода и борьбы с проблемами асинхронности, как вы упомянули
lair
Вот мне и интересно, для каких задач оптимально выбирать Rx.
i_user
Большой профит RX — он очень хорошо тестируется. Соответственно если при разработке приложения высокие требования к качеству — я буду смотреть в сторону RX.
Засчет биндинга — он очень хорошо покрывает задачи где данные собираются больше чем с одного источника (неважно — разные эндпоинты API, системные ресурсы, БДшечка) и претерпевают какие-то преобразования попути. То есть очень похоже на развитие идей LINQ.
Минус — Быстродействие — соответственно — если у меня много разных маленьких элементиков и требования к 60fps — выберу что-то более простое и прямое
RX очень требователен к уровню команды — соответственно с разномастной командой я бы в RX не ввязывался — его невозможно отлаживать — только тестировать.
То есть если я буду писать быстрый прототипчик чего-нибудь или игру, или буду работать в команде возможностей которой я не знаю — я, вероятно, выберу не RX
Если я буду писать алгоритмически сложную задачу требовательную к ресурсам и нуждающуюся в магии — я выберу не RX
Иначе — RX
lair
«Профит» — это когда не просто «хорошо», а «лучше». Я знаю много технологий, которые хорошо тестируются (те же акторы, скажем, не говоря уже про PO?O). Конкретно в Rx я знаю только один профит тестируемости — заранее встроенные места для подмены шедулеров и времени, что позволяет имитировать разные ситуации с таймаутами, наложениями, гонками и прочей красотой. А во всем прочем… тестируется как и любой другой хороший фреймворк.
Вот это как раз последовательности данных, о которых я говорил в самом начале, и в этом качестве я Rx как раз очень ценю.
i_user
Я, как вероятно и автор статьи — из мира мобилок) тут гораздо меньше таких вот «хороших подходов» — и тестируемость по прежнему считается достижением :-) Но вообще да, пожалуй согласен. Просто реактивный подход для меня оказался первым где получилось построить что-то действительно красивое — поэтому я его переоцениваю — хотя тех же результатов можно добиться и иначе.
Благодарю за беседу)
Fen1kz
Собственно мой коммент был как раз о том, что гонять integer'ы и всякие Observable.never это весело и прикольно и об этом куча статей, а вот злых примеров rx никто не дает. Иногда берут один пример и пишут подробную статью, мешая архитектуру к объяснениями как отправлять запрос.
Потому что мне этот пример актуален и проблему «мигания интернета» я решил отвратительно, с помощью списка «отправляемых в данный момент моделей» и второй цепочкой Observable внутри Observable (иначе я не могу понять когда «подцепочка» завершилась). Что мне вообще не нравится, но умишка сделать лучше нехватает
bolein95
Если последовательно и правильно сложить знания в любой области, то умишка обычно хватает на решение самых сложных задач. На это, собственно, и нацелен данный туториал
Fen1kz
А я вначале постулировал, что данный туториал на это не нацелен и является пересказом базовых вещей из документации. Потому что он никак не помогает, например, разобраться почему Observable выдаст onCompleted если где-то выкинется ошибка и как продолжить в случае чего.
i_user
разумеется RX не панацея — но цель автора статья, да и моя — показать, что это достаточно взрослая парадигма для того, чтобы занимать ключевую позицию в реальных задачах.
lair
А вот с этим я и не спорю, я просто считаю, что даже ключевые технологии не надо применять для вообще всего на свете.
xGromMx
В вашем случае стоит юзать Observable.using и еще немного операторов
Fen1kz
Так в случае автора может не плодить очередное интро, написанное в документации, а заюзать Observable.using и прочие фишки тоже?
xGromMx
я хз) я начал делать rx-book еще до того как была вменяемая дока у Rx http://xgrommx.github.io/rx-book/ Раздел Resources очень интересный