В прошлой статье мы рассмотрели как создавать свои операторы RxJS. Сейчас я хочу поговорить о малоизвестном операторе создания — defer — и объяснить как можно использовать его для некоторых ситуаций


Допустим нужно сделать оператор, который берет функцию и выполняет ее лишь один раз, при первом получении значения. Реализуем его под именем tapOnce


function tapOnce<T>(fn: Function): OperatorFunction<T, T> {
  return function(source: Observable<any>) {
    let run = false;
    return source.pipe(
      tap(() => {
        if (!run) {
          fn();
          run = true;
        }
      })
    );
  };
}

Код понятен — tap используется для запуска функции, флаг run нужен чтобы сделать это только один раз. Теперь используем оператор.


const source = interval(5000).pipe(
  tapOnce(() => console.log('+')
));

source.subscribe(console.log);

Все работает, плюсик выводится в консоль только при первом эмите. А теперь добавим подписчиков.


const source = interval(5000).pipe(
  tapOnce(() => console.log('+')
));

source.subscribe(console.log);
source.subscribe(console.log);

Если посмотреть в консоль — там только один плюсик. Проблема в том что оба подписчика используют одно и то же лексическое окружение и ссылаются на ту же переменную run. Нужен способ отсрочить создание потока до момента, когда кто-нибудь подпишется.


Поможет defer


import { defer } from 'rxjs';

function tapOnce<T>(fn: Function): OperatorFunction<T, T> {
  return function(source: Observable<any>) {
    return defer(() => {
      let run = false;
      return source.pipe(
        tap(() => {
          if (!run) {
            fn();
            run = true;
          }
        })
      );
    });
  };
}

Оператор defer принимает функцию, которая должна вернуть ObservableInput. Код внутри defer выполнится только при подписке, а не во время создания. Используя этот подход и благодаря замыканию js, каждый подписчик использует свое собственное лексическое окружение.


Создадим свою простую реализацию defer для лучшего понимания.


function defer(observableFactory: () => ObservableInput<any>) {
  return new Observable(subscriber => {
    const source = observableFactory();
    return source.subscribe(subscriber);
  });
}

defer возвращает новый поток, который создается в момент подписки функцией-фабрикой, и будет использоваться как источник.


Вот еще примеры, где будет полезен defer. Скажем у нас есть выражение, которое надо посчитать когда кто-нибудь подписывается. Например


const randNum = of(Math.random());

randNum.subscribe(console.log);
randNum.subscribe(console.log);

В этом примере каждый подписчик получит одно и тоже случайное значение. Можно поправить, чтобы выражение посчиталось при подписке, а не при объявлении.


const randNum = defer(() => of(Math.random()));

randNum2.subscribe(console.log);
randNum2.subscribe(console.log);

// The same concept as using a function
const randNum = () => of(Math.random());
randNum2().subscribe(console.log);
randNum2().subscribe(console.log);

Другой пример это когда нужно отсрочить выполнение промиса.


// This already executing regardless the numbers of handlers
const promise = new Promise(resolve => {
  resolve();
});

// Deferring the creation of the promise until someone subscribes
const promiseDefered = defer(() => {
  return new Promise(resolve => {
    resolve();
  });
});

promiseDefered.subscribe(console.log);

Промисы же выполняются сразу независимо от числа слушателей. Можно сделать промис похожим на поток (т.е. ленивым) используя defer.