Представляю вашему вниманию типичные варианты использования Observable объектов в компонентах и сервисах Angular 4.



Подписка на параметр роутера и мапинг на другой Observable


Задача: При открытии страницы example.com/#/users/42, по userId получить данные пользователя.


Решение: При инициализации компоненты UserDetailsComponent мы подписываемся на параметры роутера. То есть если userId будет меняться — будер срабатывать наша подписка. Используя полученный userId, мы из сервиса userService получаем Observable с данными пользователя.


// UserDetailsComponent

ngOnInit() {
  this.route.params
    .pluck('userId') // получаем userId из параметров
    .switchMap(userId => this.userService.getData(userId))
    .subscribe(user => this.user = user);
}




Подписка на параметр роутера и строку запроса


Задача: При открытии страницы example.com/#/users/42?regionId=13 нужно выполнить функцию load(userId, regionId). Где userId мы получаем из роутера, а regionId — из параметров запроса.


Решение: У нас два источника событий, поэтому воспользуемся функцией Observable.combineLatest, которая будет срабатывать, когда каждый из источников генерирует событие.


ngOnInit() {
  Observable.combineLatest(this.route.params, this.route.queryParams)
    .subscribe(([params, queryParams]) => { // полученный массив деструктурируем
      const userId = params['userId'];
      const regionId = queryParams['regionId'];
      this.load(userId, regionId);
    });
}

Обратите внимание, что созданные подписки на роутер, при разрушении объекта удалятся, за этим следит ангуляр, поэтому отписываться от параметров роутера не нужно:


The Router manages the observables it provides and localizes the subscriptions. The subscriptions are cleaned up when the component is destroyed, protecting against memory leaks, so we don't need to unsubscribe from the route params Observable. Mark Rajcok

Остановка анимации загрузки после окончания выполнения подписки


Задача: Показать значок загрузки после начала сохранения данных и скрыть его, когда данные сохранятся или произойдет ошибка.


Решение: За отображение загрузчика у нас отвечает переменная loading, после нажатия на кнопку, установим ее в true. А для установки ее в false воспользуемся Observable.finally функций, которая выполняется после завершения подписки или если произошла ошибка.


save() {
  this.loading = true;
  this.userService.save(params)
    .finally(() => this.loading = false)
    .subscribe(user => {
      // Успешно сохранили
    }, error => {
      // Ошибка сохранения
    });
}

Создание собственного источника событий


Задача: Создать переменную lang$ в configService, на которую другие компоненты будут подписываться и реагировать, когда язык будет меняться.


Решение: Воспользуемся классом BehaviorSubject для создания переменной lang$;


Отличия BehaviorSubject от Subject:


  1. BehaviorSubject должен инициализироваться с начальным значением;
  2. Подписка возвращает последнее значение Subjectа;
  3. Можно получить последнее значение напрямую через функцию getValue().

Создаём переменную lang$ и сразу инициализируем. Так же добавляем функцию setLang для установки языка.


// configService
lang$: BehaviorSubject<Language> = new BehaviorSubject<Language>(DEFAULT_LANG);
setLang(lang: Language) {
  this.lang$.next(this.currentLang); // тут мы поставим
}

Подписываеся на изменение языка в компоненте. Переменная lang$ является "горячим" Observable объектом, то есть подписка требует отписки при разрушении объекта.


private subscriptions: Subscription[] = [];
ngOnInit() {
  const langSub = this.configService.lang$
    .subscribe(() => {
      // ...
    });
  this.subscriptions.push(langSub);
}
ngOnDestroy() {
  this.subscriptions
    .forEach(s => s.unsubscribe());
}

Использование takeUntil для отписки


Отписываться можно и более изящным вариантом, особенно если в компоненте присутствует больше двух подписок:


private ngUnsubscribe: Subject<void> = new Subject<void>();

ngOnInit() {
  this.configService.lang$
    .takeUntil(this.ngUnsubscribe) // отписка по условию
    .subscribe(() => {
      // ...
    });
}

ngOnDestroy() {
  this.ngUnsubscribe.next();
  this.ngUnsubscribe.complete();
}

То есть, чтобы не терять память на горячих подписках, компонента будет работать до тех пор, пока значение ngUnsubscribe не изменится. А изменится оно, когда вызовется ngOnDestroy. Плюсы этого варианта в том, что в каждую из подписок достаточно добавить всего одну строчку, чтобы отписка сработала вовремя.


Использование Observable для автокомплита или поиска


Задача: Показывать предложения страниц при вводе данных на форме


Решение: Подпишемся на изменение данных формы, возьмём только меняющиеся данные инпута, поставим небольшую задержку, чтобы событий не было слишком много и отправим запрос в википедию. Результат выведем в консоль. Интересный момент в том, что switchMap отменит предыдущий запрос, если пришли новые данные. Это очень полезно, для избегания нежалательных эффектов от медленных запросов, если, к например, предпоследний запрос выполнялся 2 секунды, а последий 0.2 секунды, то в консоль выведется результат именно последнего запроса.


ngOnInit() {
  this.form.valueChanges
    .takeUntil(this.ngUnsubscribe)      // отписаться после разрушения
    .map(form => form['search-input'])  // данные инпута
    .distinctUntilChanged()             // брать измененные данные
    .debounceTime(300)                  // реагировать не сразу
    .switchMap(this.wikipediaSearch)    // переключить Observable на запрос в Вики
    .subscribe(data => console.log(data));
}

wikipediaSearch = (text: string) => {
  return Observable
    .ajax('https://api.github.com/search/repositories?q=' + text)
    .map(e => e.response);
}

Кеширование запроса


Задача: Необходимо закешировать Observable запрос


Решение: Воспользуемся связкой publishReplay и refCount. Первая функция закеширует одно значение функции на 2 секунды, а вторая будет считать созданные подписки. То есть, Observable завершится, когда все подписки будут выполнены. Тут можно прочитать подробнее.


// tagService

private tagsCache$ = this.getTags()
  .publishReplay(1, 2000) // кешируем одно значение на 2 секунды
  .refCount()             // считаем ссылки
  .take(1);               // берем 1 значение

getCachedTags() {
  return tagsCache$;
}

Последовательный combineLatest


Задача: Критическая ситуация на сервере! Backend команда сообщила, что для корректного обновления продукта нужно выполнять строго последовательно:


  1. Обновление данных продукта (заголовок и описание);
  2. Обновление списка тегов продукта;
  3. Обновление списка категорий продукта;

Решение: У нас есть 3 Observable, полученных из productService. Воспользуемся concatMap:


const updateProduct$ = this.productService.update(product);
const updateTags$ = this.productService.updateTags(productId, tagList);
const updateCategories$ = this.productService.updateCategories(productId, categoryList);

Observable
  .from([updateProduct$, updateTags$, updateCategories$])
  .concatMap(a => a)  // выполняем обновление последовательно
  .toArray()          // Возвращает массив из последовательности
  .subscribe(res => console.log(res)); // res содержит массив результатов запросов

Загадка на посошок


Если у вас есть желание немного поупражняться, решите предыдущую задачу, но для создания продукта. То есть сначала создаём продукт, потом обновляем теги, а только потом — категории.


Полезные ссылки


  • Заворожённо посмотреть на шарики: rxviz.com
  • Потаскать шарики мышкой: rxmarbles.com

Комментарии (12)


  1. nomoreload
    24.09.2017 13:20
    +3

    Огромное Вам, человеческое, спасибо! Весьма годная подборка для начинающего. На русском материала по RxJS практически нет. Вы осветили крайне полезные (по крайней мере, для меня) юзкейсы.


  1. xGromMx
    24.09.2017 19:13
    +3

    Можно же последний снипет так
    Observable.concat(updateProduct$, updateTags$, updateCategories$).toArray().subscribe(res => console.log(res))
    Ну или в случае с from
    Observable.from([updateProduct$, updateTags$, updateCategories$]).concatAll().toArray().subscribe(res => console.log(res))
    Ну или с of
    Observable.of(updateProduct$, updateTags$, updateCategories$).concatAll().toArray().subscribe(res => console.log(res))


  1. VladVR
    25.09.2017 15:29

    Вопрос такой — а есть ли аналогично takeUntil оператор что то вроде restartWhen?

    Задача такая — UI получает observable из метода getSmth, соответственно хотелось бы в методе saveSmth вызвать next и чтобы все подписки сходили еще раз на бакенд и получили новые данные
    Я пока что реализовал примерно то что мне надо руками, но вдруг уже есть готовое решение.


    1. lucius Автор
      25.09.2017 16:55

      Думаю что вам нужно посмотреть в сторону retryWhen.


  1. vintage
    26.09.2017 12:18

    Ужасы какие :-) Давайте я вам лучше покажу, как то же самое реализуется с использованием ОРП...


    При открытии страницы example.com/#/users/42, по userId получить данные пользователя.

    ФРП:


    ngOnInit() {
      this.route.params
        .pluck('userId') // получаем userId из параметров
        .switchMap(userId => this.userService.getData(userId))
        .subscribe(user => this.user = user);
    }

    ОРП:


    // example.com/#user=123
    user() {
        return this.user_serice().data( $mol_state_arg.value( 'user' ) )
    }

    Можно поддержать и формат урлов из задачи, но суть не поменяется.


    При открытии страницы example.com/#/users/42?regionId=13 нужно выполнить функцию load(userId, regionId). Где userId мы получаем из роутера, а regionId — из параметров запроса.

    ФРП:


    ngOnInit() {
      Observable.combineLatest(this.route.params, this.route.queryParams)
        .subscribe(([params, queryParams]) => { // полученный массив деструктурируем
          const userId = params['userId'];
          const regionId = queryParams['regionId'];
          this.load(userId, regionId);
        });
    }

    ОРП:


    // example.com/#user=123/region=456
    data() {
        return this.load( $mol_state_arg.value( 'user' ) , $mol_state_arg.value( 'region' ) )
    }

    Можно поддержать и формат урлов из задачи, но опять же суть не поменяется.


    Показать значок загрузки после начала сохранения данных и скрыть его, когда данные сохранятся или произойдет ошибка.

    ФРП:


    save() {
      this.loading = true;
      this.userService.save(params)
        .finally(() => this.loading = false)
        .subscribe(user => {
          // Успешно сохранили
        }, error => {
          // Ошибка сохранения
        });
    }

    ОРП:


    @ $mol_mem
    save() {
        return this.user_service().save( params )
    }

    Да, реально, больше ничего делать не надо — анимация сама начнётся, когда начнётся запрос, закончится, когда запрос завершится, и будет нарисована ошибка в случае ошибки.


    Создать переменную lang$ в configService, на которую другие компоненты будут подписываться и реагировать, когда язык будет меняться.

    ФРП:


    lang$: BehaviorSubject<Language> = new BehaviorSubject<Language>(DEFAULT_LANG);
    setLang(lang: Language) {
      this.lang$.next(this.currentLang); // тут мы поставим
    }

    private subscriptions: Subscription[] = [];
    ngOnInit() {
      const langSub = this.configService.lang$
        .subscribe(() => {
          // ...
        });
      this.subscriptions.push(langSub);
    }
    ngOnDestroy() {
      this.subscriptions
        .forEach(s => s.unsubscribe());
    }

    ОПР:


    @ $mol_mem
    lang( next = 'en' ) { return next }
    
    title() {
        return this.language_service().text( this.lnag() , 'title' )
    }

    Да-да, подписки/отписки — не наша забота, всё будет работать как надо.


    Показывать предложения страниц при вводе данных на форме

    ФРП:


    ngOnInit() {
      this.form.valueChanges
        .takeUntil(this.ngUnsubscribe)      // отписаться после разрушения
        .map(form => form['search-input'])  // данные инпута
        .distinctUntilChanged()             // брать измененные данные
        .debounceTime(300)                  // реагировать не сразу
        .switchMap(this.wikipediaSearch)    // переключить Observable на запрос в Вики
        .subscribe(data => console.log(data));
    }
    
    wikipediaSearch = (text: string) => {
      return Observable
        .ajax('https://api.github.com/search/repositories?q=' + text)
        .map(e => e.response);
    }

    ОРП:


    suggests() {
        return $mol_http.resource( 'https://api.github.com/search/repositories?q=' + this.query() ) ).json().items
    }

    О подписках/отписках заботиться не надо, актуальное значение всегда есть, при изменении запроса возвращаться будут только данные для актуального запроса, а предыдущие запросы будут автоматически прибиваться. Задержки разве что тут нет, но она реализуется не тут, а компоненте поля ввода.


    Необходимо закешировать Observable запрос

    ФРП:


    private tagsCache$ = this.getTags()
      .publishReplay(1, 2000) // кешируем одно значение на 2 секунды
      .refCount()             // считаем ссылки
      .take(1);               // берем 1 значение
    
    getCachedTags() {
      return tagsCache$;
    }

    ОРП:


    @ $mol_mem
    tags() {
        const resource = $mol_http.resource( '/tags' )
    
        // сбрасываем кеш через 2 секунды
        setTimeout( ()=> resource.json( undefined , $mol_atom_force ) , 2000 )
    
        return resource.json()
    }

    Так как кеширование происходит по умолчанию, то задача своидится к противоположной — сбросить кэш через 2 секунды.


    Критическая ситуация на сервере! Backend команда сообщила, что для корректного обновления продукта нужно выполнять строго последовательно: Обновление данных продукта (заголовок и описание); Обновление списка тегов продукта; Обновление списка категорий продукта.

    ФРП:


    const updateProduct$ = this.productService.update(product);
    const updateTags$ = this.productService.updateTags(productId, tagList);
    const updateCategories$ = this.productService.updateCategories(productId, categoryList);
    
    Observable
      .from([updateProduct$, updateTags$, updateCategories$])
      .concatMap(a => a)  // выполняем обновление последовательно
      .toArray()          // Возвращает массив из последовательности
      .subscribe(res => console.log(res)); // res содержит массив результатов запросов

    ОРП:


    update_product() { return this.product_service().update( this.product() ) }
    update_tags() { return this.product_service().update_tags( this.product() , this.tags() ) }
    
    update_categories() { return this.product_service().update_categories( this.product() , this.categories() ) }
    
    @ $mol_mem
    updating() {
        console.log(
            this.update_product().valueOf() ,
            this.update_tags().valueOf() ,
            this.update_categories().valueOf() ,
        )
    }

    Без valueOf() все запросы пошли бы параллельно. Поэтому мы немедленно требуем результат, чтобы следующий запрос не начался до получения результата предыдущего.


    Загадка на посошок

    Ну и у меня для вас задачка:


    У меня есть список игрушек, у каждой игрушки есть свойства. Свойства могут меняться. Есть функция фильтрации, которая может быть сложной и тяжёлой и которая тоже может меняться динамически. Хотелось бы, чтобы перефильтрация происходила лишь тогда, когда меняются свойства, от которых результат фильтрации реально зависит. Я так понимаю, каждое свойство должно быть стримом и надо как-то подписаться на заданные свойства всех игрушек. Как это лучше всего сделать?


    У меня пока получилось следующее:


    const ToysSource = new Rx.BehaviorSubject( [] )
    const Toys = ToysSource.distinctUntilChanged().debounce( 0 )
    
    const FilterSource = new Rx.BehaviorSubject( toy => toy.count > 0  )
    const Filter = FilterSource.distinctUntilChanged().debounce( 0 )
    
    const ToysFiltered = Filter
    .select( filter => {
      if( !filter ) return Toys
      return Toys.map( toys => toys.filter( filter ) )
    } )
    .switch()
    .distinctUntilChanged()
    .debounce( 0 )

    Но тут, очевидно, при любом изменении игрушек будет происходить повторная фильтрация. Например: фильтруем по числу остатков, а меняется цена — происходит повторная фильтрация, что не хорошо.


  1. Druu
    26.09.2017 18:30

    > Хотелось бы, чтобы перефильтрация происходила лишь тогда, когда меняются свойства, от которых результат фильтрации реально зависит.

    А ничего, что это алгоритмически неразрешимая задача? filter toy = if arithmetic_is_consistent then test(toy.property1) else test(toy.property2)


    1. vintage
      26.09.2017 18:41

      Всмысле неразрешимая? ОРП же позволяет выстроить потоки данных так, что изменение цены не будет приводить к перефильтрации, пока мы не фильтруем по цене. Уверен такое можно сделать и на Rx через хитрую комбинацию операторов.


      1. Druu
        26.09.2017 18:57

        > Всмысле неразрешимая?

        Всмысле, нет никакого способа узнать, какие поля требуются для фильтра, не запустив сам фильтр. Смотрите выше пример — надо ли перефильтровывать при изменении property1? а property2?

        > ОРП же позволяет выстроить потоки данных так, что изменение цены не будет приводить к перефильтрации, пока мы не фильтруем по цене.

        Либо не позволяет, либо я не понимаю, что вы имеете в виду.


        1. vintage
          26.09.2017 19:36

          Смотрите выше пример — надо ли перефильтровывать при изменении property1? а property2?

          Допустим у фильтра есть метаданные, в которых указано какие свойства он проверяет, тогда можно заранее сказать какие при изменении каких свойств нужно провести перефильтрацию.


          Либо не позволяет, либо я не понимаю, что вы имеете в виду.

          Во время предыдущей фильтрации отслеживается к каким свойствам было обращение. Изменение этих свойств или фильтра приведёт к перефильтрации. Изменение любых других — не приведёт.


          1. Druu
            27.09.2017 03:48

            > Допустим у фильтра есть метаданные

            Рассчитать программно эти метаданные нельзя, то есть их добавляет к фильтру программист?

            > Во время предыдущей фильтрации отслеживается к каким свойствам было обращение.

            А если при том же фильтре в разных случаях обращение к разным полям?


            1. vintage
              27.09.2017 07:44

              Рассчитать программно эти метаданные нельзя, то есть их добавляет к фильтру программист?

              Агась.


              А если при том же фильтре в разных случаях обращение к разным полям?

              Если там не Math.random(), а зависит от какого-либо реактивного свойства, то всё будет ок — при изменении этого свойства будет перефильтрация.


  1. Druu
    26.09.2017 18:51

    deleted