Представляю вашему вниманию типичные варианты использования Observable объектов в компонентах и сервисах Angular 4.
Подписка на параметр роутера и мапинг на другой Observable
Задача: При открытии страницы example.com/#/users/42
, по userId
получить данные пользователя.
Решение: При инициализации компоненты UserDetailsComponent
мы подписываемся на параметры роутера. То есть если userId
будет меняться — будер срабатывать наша подписка. Используя полученный userId
, мы из сервиса userService
получаем Observable
с данными пользователя.
// UserDetailsComponent
ngOnInit() {
this.route.params
.pluck('userId') // получаем userId из параметров
.switchMap(userId => this.userService.getData(userId))
.subscribe(user => this.user = user);
}
Подписка на параметр роутера и строку запроса
Задача: При открытии страницы example.com/#/users/42?regionId=13
нужно выполнить функцию load(userId, regionId)
. Где userId
мы получаем из роутера, а regionId
— из параметров запроса.
Решение: У нас два источника событий, поэтому воспользуемся функцией Observable.combineLatest, которая будет срабатывать, когда каждый из источников генерирует событие.
ngOnInit() {
Observable.combineLatest(this.route.params, this.route.queryParams)
.subscribe(([params, queryParams]) => { // полученный массив деструктурируем
const userId = params['userId'];
const regionId = queryParams['regionId'];
this.load(userId, regionId);
});
}
Обратите внимание, что созданные подписки на роутер, при разрушении объекта удалятся, за этим следит ангуляр, поэтому отписываться от параметров роутера не нужно:
The Router manages the observables it provides and localizes the subscriptions. The subscriptions are cleaned up when the component is destroyed, protecting against memory leaks, so we don't need to unsubscribe from the route params Observable. Mark Rajcok
Остановка анимации загрузки после окончания выполнения подписки
Задача: Показать значок загрузки после начала сохранения данных и скрыть его, когда данные сохранятся или произойдет ошибка.
Решение: За отображение загрузчика у нас отвечает переменная loading
, после нажатия на кнопку, установим ее в true
. А для установки ее в false
воспользуемся Observable.finally
функций, которая выполняется после завершения подписки или если произошла ошибка.
save() {
this.loading = true;
this.userService.save(params)
.finally(() => this.loading = false)
.subscribe(user => {
// Успешно сохранили
}, error => {
// Ошибка сохранения
});
}
Создание собственного источника событий
Задача: Создать переменную lang$
в configService
, на которую другие компоненты будут подписываться и реагировать, когда язык будет меняться.
Решение: Воспользуемся классом BehaviorSubject
для создания переменной lang$
;
Отличия BehaviorSubject
от Subject
:
BehaviorSubject
должен инициализироваться с начальным значением;- Подписка возвращает последнее значение
Subject
а; - Можно получить последнее значение напрямую через функцию
getValue()
.
Создаём переменную lang$
и сразу инициализируем. Так же добавляем функцию setLang
для установки языка.
// configService
lang$: BehaviorSubject<Language> = new BehaviorSubject<Language>(DEFAULT_LANG);
setLang(lang: Language) {
this.lang$.next(this.currentLang); // тут мы поставим
}
Подписываеся на изменение языка в компоненте. Переменная lang$
является "горячим" Observable объектом, то есть подписка требует отписки при разрушении объекта.
private subscriptions: Subscription[] = [];
ngOnInit() {
const langSub = this.configService.lang$
.subscribe(() => {
// ...
});
this.subscriptions.push(langSub);
}
ngOnDestroy() {
this.subscriptions
.forEach(s => s.unsubscribe());
}
Использование takeUntil для отписки
Отписываться можно и более изящным вариантом, особенно если в компоненте присутствует больше двух подписок:
private ngUnsubscribe: Subject<void> = new Subject<void>();
ngOnInit() {
this.configService.lang$
.takeUntil(this.ngUnsubscribe) // отписка по условию
.subscribe(() => {
// ...
});
}
ngOnDestroy() {
this.ngUnsubscribe.next();
this.ngUnsubscribe.complete();
}
То есть, чтобы не терять память на горячих подписках, компонента будет работать до тех пор, пока значение ngUnsubscribe
не изменится. А изменится оно, когда вызовется ngOnDestroy
. Плюсы этого варианта в том, что в каждую из подписок достаточно добавить всего одну строчку, чтобы отписка сработала вовремя.
Использование Observable для автокомплита или поиска
Задача: Показывать предложения страниц при вводе данных на форме
Решение: Подпишемся на изменение данных формы, возьмём только меняющиеся данные инпута, поставим небольшую задержку, чтобы событий не было слишком много и отправим запрос в википедию. Результат выведем в консоль. Интересный момент в том, что switchMap
отменит предыдущий запрос, если пришли новые данные. Это очень полезно, для избегания нежалательных эффектов от медленных запросов, если, к например, предпоследний запрос выполнялся 2 секунды, а последий 0.2 секунды, то в консоль выведется результат именно последнего запроса.
ngOnInit() {
this.form.valueChanges
.takeUntil(this.ngUnsubscribe) // отписаться после разрушения
.map(form => form['search-input']) // данные инпута
.distinctUntilChanged() // брать измененные данные
.debounceTime(300) // реагировать не сразу
.switchMap(this.wikipediaSearch) // переключить Observable на запрос в Вики
.subscribe(data => console.log(data));
}
wikipediaSearch = (text: string) => {
return Observable
.ajax('https://api.github.com/search/repositories?q=' + text)
.map(e => e.response);
}
Кеширование запроса
Задача: Необходимо закешировать Observable запрос
Решение: Воспользуемся связкой publishReplay
и refCount
. Первая функция закеширует одно значение функции на 2 секунды, а вторая будет считать созданные подписки. То есть, Observable завершится, когда все подписки будут выполнены. Тут можно прочитать подробнее.
// tagService
private tagsCache$ = this.getTags()
.publishReplay(1, 2000) // кешируем одно значение на 2 секунды
.refCount() // считаем ссылки
.take(1); // берем 1 значение
getCachedTags() {
return tagsCache$;
}
Последовательный combineLatest
Задача: Критическая ситуация на сервере! Backend команда сообщила, что для корректного обновления продукта нужно выполнять строго последовательно:
- Обновление данных продукта (заголовок и описание);
- Обновление списка тегов продукта;
- Обновление списка категорий продукта;
Решение: У нас есть 3 Observable, полученных из productService
. Воспользуемся concatMap
:
const updateProduct$ = this.productService.update(product);
const updateTags$ = this.productService.updateTags(productId, tagList);
const updateCategories$ = this.productService.updateCategories(productId, categoryList);
Observable
.from([updateProduct$, updateTags$, updateCategories$])
.concatMap(a => a) // выполняем обновление последовательно
.toArray() // Возвращает массив из последовательности
.subscribe(res => console.log(res)); // res содержит массив результатов запросов
Загадка на посошок
Если у вас есть желание немного поупражняться, решите предыдущую задачу, но для создания продукта. То есть сначала создаём продукт, потом обновляем теги, а только потом — категории.
Полезные ссылки
- Заворожённо посмотреть на шарики: rxviz.com
- Потаскать шарики мышкой: rxmarbles.com
Комментарии (12)
xGromMx
24.09.2017 19:13+3Можно же последний снипет так
Observable.concat(updateProduct$, updateTags$, updateCategories$).toArray().subscribe(res => console.log(res))
Ну или в случае с from
Observable.from([updateProduct$, updateTags$, updateCategories$]).concatAll().toArray().subscribe(res => console.log(res))
Ну или с of
Observable.of(updateProduct$, updateTags$, updateCategories$).concatAll().toArray().subscribe(res => console.log(res))
VladVR
25.09.2017 15:29Вопрос такой — а есть ли аналогично takeUntil оператор что то вроде restartWhen?
Задача такая — UI получает observable из метода getSmth, соответственно хотелось бы в методе saveSmth вызвать next и чтобы все подписки сходили еще раз на бакенд и получили новые данные
Я пока что реализовал примерно то что мне надо руками, но вдруг уже есть готовое решение.
vintage
26.09.2017 12:18Ужасы какие :-) Давайте я вам лучше покажу, как то же самое реализуется с использованием ОРП...
При открытии страницы example.com/#/users/42, по userId получить данные пользователя.
ФРП:
ngOnInit() { this.route.params .pluck('userId') // получаем userId из параметров .switchMap(userId => this.userService.getData(userId)) .subscribe(user => this.user = user); }
ОРП:
// example.com/#user=123 user() { return this.user_serice().data( $mol_state_arg.value( 'user' ) ) }
Можно поддержать и формат урлов из задачи, но суть не поменяется.
При открытии страницы example.com/#/users/42?regionId=13 нужно выполнить функцию load(userId, regionId). Где userId мы получаем из роутера, а regionId — из параметров запроса.
ФРП:
ngOnInit() { Observable.combineLatest(this.route.params, this.route.queryParams) .subscribe(([params, queryParams]) => { // полученный массив деструктурируем const userId = params['userId']; const regionId = queryParams['regionId']; this.load(userId, regionId); }); }
ОРП:
// example.com/#user=123/region=456 data() { return this.load( $mol_state_arg.value( 'user' ) , $mol_state_arg.value( 'region' ) ) }
Можно поддержать и формат урлов из задачи, но опять же суть не поменяется.
Показать значок загрузки после начала сохранения данных и скрыть его, когда данные сохранятся или произойдет ошибка.
ФРП:
save() { this.loading = true; this.userService.save(params) .finally(() => this.loading = false) .subscribe(user => { // Успешно сохранили }, error => { // Ошибка сохранения }); }
ОРП:
@ $mol_mem save() { return this.user_service().save( params ) }
Да, реально, больше ничего делать не надо — анимация сама начнётся, когда начнётся запрос, закончится, когда запрос завершится, и будет нарисована ошибка в случае ошибки.
Создать переменную lang$ в configService, на которую другие компоненты будут подписываться и реагировать, когда язык будет меняться.
ФРП:
lang$: BehaviorSubject<Language> = new BehaviorSubject<Language>(DEFAULT_LANG); setLang(lang: Language) { this.lang$.next(this.currentLang); // тут мы поставим }
private subscriptions: Subscription[] = []; ngOnInit() { const langSub = this.configService.lang$ .subscribe(() => { // ... }); this.subscriptions.push(langSub); } ngOnDestroy() { this.subscriptions .forEach(s => s.unsubscribe()); }
ОПР:
@ $mol_mem lang( next = 'en' ) { return next } title() { return this.language_service().text( this.lnag() , 'title' ) }
Да-да, подписки/отписки — не наша забота, всё будет работать как надо.
Показывать предложения страниц при вводе данных на форме
ФРП:
ngOnInit() { this.form.valueChanges .takeUntil(this.ngUnsubscribe) // отписаться после разрушения .map(form => form['search-input']) // данные инпута .distinctUntilChanged() // брать измененные данные .debounceTime(300) // реагировать не сразу .switchMap(this.wikipediaSearch) // переключить Observable на запрос в Вики .subscribe(data => console.log(data)); } wikipediaSearch = (text: string) => { return Observable .ajax('https://api.github.com/search/repositories?q=' + text) .map(e => e.response); }
ОРП:
suggests() { return $mol_http.resource( 'https://api.github.com/search/repositories?q=' + this.query() ) ).json().items }
О подписках/отписках заботиться не надо, актуальное значение всегда есть, при изменении запроса возвращаться будут только данные для актуального запроса, а предыдущие запросы будут автоматически прибиваться. Задержки разве что тут нет, но она реализуется не тут, а компоненте поля ввода.
Необходимо закешировать Observable запрос
ФРП:
private tagsCache$ = this.getTags() .publishReplay(1, 2000) // кешируем одно значение на 2 секунды .refCount() // считаем ссылки .take(1); // берем 1 значение getCachedTags() { return tagsCache$; }
ОРП:
@ $mol_mem tags() { const resource = $mol_http.resource( '/tags' ) // сбрасываем кеш через 2 секунды setTimeout( ()=> resource.json( undefined , $mol_atom_force ) , 2000 ) return resource.json() }
Так как кеширование происходит по умолчанию, то задача своидится к противоположной — сбросить кэш через 2 секунды.
Критическая ситуация на сервере! Backend команда сообщила, что для корректного обновления продукта нужно выполнять строго последовательно: Обновление данных продукта (заголовок и описание); Обновление списка тегов продукта; Обновление списка категорий продукта.
ФРП:
const updateProduct$ = this.productService.update(product); const updateTags$ = this.productService.updateTags(productId, tagList); const updateCategories$ = this.productService.updateCategories(productId, categoryList); Observable .from([updateProduct$, updateTags$, updateCategories$]) .concatMap(a => a) // выполняем обновление последовательно .toArray() // Возвращает массив из последовательности .subscribe(res => console.log(res)); // res содержит массив результатов запросов
ОРП:
update_product() { return this.product_service().update( this.product() ) } update_tags() { return this.product_service().update_tags( this.product() , this.tags() ) } update_categories() { return this.product_service().update_categories( this.product() , this.categories() ) } @ $mol_mem updating() { console.log( this.update_product().valueOf() , this.update_tags().valueOf() , this.update_categories().valueOf() , ) }
Без
valueOf()
все запросы пошли бы параллельно. Поэтому мы немедленно требуем результат, чтобы следующий запрос не начался до получения результата предыдущего.
Загадка на посошок
Ну и у меня для вас задачка:
У меня есть список игрушек, у каждой игрушки есть свойства. Свойства могут меняться. Есть функция фильтрации, которая может быть сложной и тяжёлой и которая тоже может меняться динамически. Хотелось бы, чтобы перефильтрация происходила лишь тогда, когда меняются свойства, от которых результат фильтрации реально зависит. Я так понимаю, каждое свойство должно быть стримом и надо как-то подписаться на заданные свойства всех игрушек. Как это лучше всего сделать?
У меня пока получилось следующее:
const ToysSource = new Rx.BehaviorSubject( [] ) const Toys = ToysSource.distinctUntilChanged().debounce( 0 ) const FilterSource = new Rx.BehaviorSubject( toy => toy.count > 0 ) const Filter = FilterSource.distinctUntilChanged().debounce( 0 ) const ToysFiltered = Filter .select( filter => { if( !filter ) return Toys return Toys.map( toys => toys.filter( filter ) ) } ) .switch() .distinctUntilChanged() .debounce( 0 )
Но тут, очевидно, при любом изменении игрушек будет происходить повторная фильтрация. Например: фильтруем по числу остатков, а меняется цена — происходит повторная фильтрация, что не хорошо.
Druu
26.09.2017 18:30> Хотелось бы, чтобы перефильтрация происходила лишь тогда, когда меняются свойства, от которых результат фильтрации реально зависит.
А ничего, что это алгоритмически неразрешимая задача? filter toy = if arithmetic_is_consistent then test(toy.property1) else test(toy.property2)vintage
26.09.2017 18:41Всмысле неразрешимая? ОРП же позволяет выстроить потоки данных так, что изменение цены не будет приводить к перефильтрации, пока мы не фильтруем по цене. Уверен такое можно сделать и на Rx через хитрую комбинацию операторов.
Druu
26.09.2017 18:57> Всмысле неразрешимая?
Всмысле, нет никакого способа узнать, какие поля требуются для фильтра, не запустив сам фильтр. Смотрите выше пример — надо ли перефильтровывать при изменении property1? а property2?
> ОРП же позволяет выстроить потоки данных так, что изменение цены не будет приводить к перефильтрации, пока мы не фильтруем по цене.
Либо не позволяет, либо я не понимаю, что вы имеете в виду.vintage
26.09.2017 19:36Смотрите выше пример — надо ли перефильтровывать при изменении property1? а property2?
Допустим у фильтра есть метаданные, в которых указано какие свойства он проверяет, тогда можно заранее сказать какие при изменении каких свойств нужно провести перефильтрацию.
Либо не позволяет, либо я не понимаю, что вы имеете в виду.
Во время предыдущей фильтрации отслеживается к каким свойствам было обращение. Изменение этих свойств или фильтра приведёт к перефильтрации. Изменение любых других — не приведёт.
Druu
27.09.2017 03:48> Допустим у фильтра есть метаданные
Рассчитать программно эти метаданные нельзя, то есть их добавляет к фильтру программист?
> Во время предыдущей фильтрации отслеживается к каким свойствам было обращение.
А если при том же фильтре в разных случаях обращение к разным полям?vintage
27.09.2017 07:44Рассчитать программно эти метаданные нельзя, то есть их добавляет к фильтру программист?
Агась.
А если при том же фильтре в разных случаях обращение к разным полям?
Если там не
Math.random()
, а зависит от какого-либо реактивного свойства, то всё будет ок — при изменении этого свойства будет перефильтрация.
nomoreload
Огромное Вам, человеческое, спасибо! Весьма годная подборка для начинающего. На русском материала по RxJS практически нет. Вы осветили крайне полезные (по крайней мере, для меня) юзкейсы.