Что вы знаете о Schedulers в RxJS? Они скрывают от разработчиков работу с контекстом выполнения Observable. Как те эльфы-домовики из Гарри Поттера, которые выполняют всю черную работу в Хогвартсе, а о них никто даже и не слышал. Давайте исправим это и узнаем о них чуть больше.

Что такое Scheduler

Scheduler позволяет определить в каком контексте выполнения Observable будет доставлять нотификации до Observer. (вольный перевод документации)

Другими словами, Scheduler управляет очередностью и временем выполнения операций в Observable. Пример ниже поможет разобраться нам, как это работает.

import { of } from "rxjs";

console.log("Start");
of("Observable").subscribe(console.log);
console.log("End");

// Logs:
// Start
// Observable
// End

Два console.log зажали Observable между собой. Код выполняется синхронно. Если мы хотим, чтобы наш Observable выполнялся асинхронно, нужно добавить оператор observeOn и внутрь него прокинуть нужный нам Scheduler.

import { asyncScheduler, of } from "rxjs";
import { observeOn } from "rxjs/operators";

console.log("Start");
of("Observable")
  .pipe(observeOn(asyncScheduler))
  .subscribe(console.log);
console.log("End");

// Logs:
// Start
// End
// Observable

StackBlitz

Теперь наш Observable отдает данные асинхронно. Отлично. Это похоже, если бы мы обернули его в setTimeout(…, 0), не правда ли?

Как вы уже успели заметить, мы использовали asyncScheduler в нашем коде. Это один из легендарных Schedulers. Но их гораздо больше.

Типы Schedulers

Для понимания всей мощи Schedulers нужно знать, как работает Event Loop в JavaScript. Освежить знания можно с помощью этого видео или этой статьи на хабре.

Если кратко, то в браузере свой порядок выполнения кода:

  1. Сначала выполняется синхронный код (callstack)

  2. Дальше очередь микрозадач (Promise)

  3. Потом очередь макрозадач (setTimeout, setInterval, XMLHttpRequest и т.д.).

  4. Отдельно стоит очередь для задач, которые выполняются сразу перед следующим циклом перерисовки контента. (requestAnimationFrame)

В RxJS есть Scheduler на каждый из этих пунктов:

queueScheduler

планирование синхронного кода

asapScheduler

планирование кода в очередь микрозадач

asyncScheduler

планирование кода в очередь макрозадач

animationFrameScheduler

планирование кода в очередь перед перерисовкой контента

Еще существуют VirtualTimeScheduler и TestScheduler, которые используются для тестов. О них читайте здесь.

Взгляните на код ниже.

import { of, merge, asapScheduler,asyncScheduler,queueScheduler, animationFrameScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";

const async$ = of("asyncScheduler").pipe(observeOn(asyncScheduler));
const asap$ = of("asapScheduler").pipe(observeOn(asapScheduler));
const queue$ = of("queueScheduler").pipe(observeOn(queueScheduler));
const animationFrame$ = of("animationFrameScheduler").pipe(
  observeOn(animationFrameScheduler)
);
merge(async$, asap$, queue$, animationFrame$).subscribe(console.log);

console.log("synchronous code");

// Logs:
// queueScheduler
// synchronous code
// asapScheduler
// animationFrameScheduler
// asyncScheduler

StackBlitz

Как вы видите, "queueScheduler" отработал синхронно, так как он перед "synchronous code". А "asapScheduler" раньше "asyncScheduler", потому что в нем используется очередь микрозадач.

Как использовать Schedulers

Scheduler используется с операторами observeOn и subscribeOn. Оба принимают в себя первым аргументом Scheduler, а вторым аргументом delay, который по умолчанию равен нулю.

import { of, asyncScheduler } from "rxjs";
import { observeOn, subscribeOn } from "rxjs/operators";

of("observeOn")
  .pipe(observeOn(asyncScheduler, 100))
  .subscribe(console.log);

of("subscribeOn")
  .pipe(subscribeOn(asyncScheduler, 50))
  .subscribe(console.log);

// Logs:
// subscribeOn
// observeOn

StackBlitz

Различие их в том, что observeOn планирует в каком контексте будут выполняться методы observer — next, error и complete выполняются в соответствующем с Scheduler контексте. А subscribeOn влияет на subscriber — метод subscribe будет выполняться в другом контексте.

Интересный факт, что если задать delay не равный нулю в observeOn/subscribeOn, то вне зависимости какой используется Scheduler, будет использоваться asyncScheduler. Бессмысленный код — observeOn(animationFrameScheduler, 100).

До версии RxJS 6.5.0 можно было добавить Scheduler вторым аргументом для of, from, merge, range и т.д. В новых версиях RxJS это поведение deprecated, и необходимо использовать функцию scheduled для этого.

import { of, scheduled, asapScheduler } from 'rxjs';

// DEPRECATED
// of(2, asapScheduler).subscribe(console.log);

scheduled(of('scheduled'), asapScheduler).subscribe(console.log);

Пример использования Scheduler

Мы не задумываемся о Schedulers при работе с RxJS, так как авторы библиотеки провели великолепную работу по абстрагированию этой логики. Но бывает, когда использование Scheduler будет органично, и вы должны быть к этому готовы. На моей практике был один такой случай. Меня попросили реализовать кэширование запросов отличающихся по идентификатору. Примерно такую функцию я написал:

const cache = new Map<number, any>();
function get(id: number): Observable<any> {
 if (cache.has(id)) {
   return of(cache.get(id));
 }
 return http.get(‘some-url\’ + id).pipe(
   tap(data => {
     cache.set(id, data);
   }),
 );
}

Код работает. Но есть один нюанс. В первый раз, когда использовали мою функцию, она возвращала значение асинхронно, а во второй раз - синхронно. Когда функция ведет себя так непресказуемо для вызывающего, может случится трагедия. Выходит, что я высвободил Залго.

Добавим scheduled с asyncScheduler в 4 строчке, чтобы исправить это.

return scheduled(of(cache.get(id)), asyncScheduler);

Теперь все работает предсказуемо. Залго низвержен туда, где был.

Заключение

Schedulers влияют на время и порядок выполнения задач. Четверть операторов RxJS использует их под капотом. С большой вероятностью на практике осведомленность о них не нужна. Но никогда не знаешь, в какой момент жизни пригодится Scheduler.

Я буду рад услышать о вашем опыте использования Schedulers в комментариях. Спасибо за внимание!