В предыдущей статье мы рассмотрели, что такое потоки и с чем их едят. В новой части мы познакомимся с тем, какие методы RxJS предоставляет для создания потоков, что такое операторы, пайпы(pipes) и как с ними работать.

RxJS обладает богатейшим API. В документации описано более ста методов. Чтобы немного познакомиться с ними, мы напишем простое приложение и на практике посмотрим, как выглядит реактивный код. Вы увидите, что одни и те же задачи, которые раньше казались рутинными и требовали написания большого количества кода, имеют элегантное решение, если смотреть на них сквозь призму реактивности. Но прежде чем мы перейдем к практике, рассмотрим, как потоки можно представить графически, и познакомимся с удобными методами для их создания и обработки.

Графическое представление потоков


Чтобы наглядно продемонстрировать, как ведет себя тот или иной поток, я буду использовать принятую в реактивном подходе нотацию. Вспомним наш пример из предыдущей статьи:

const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.complete();
});

Вот как будет выглядеть его графическое представление:



Поток обычно изображается в виде прямой линии. Если поток испускает какое-либо значение, то оно отображается на линии в виде кружка. Прямая черта в отображении?—?это сигнал завершения потока. Для отображения ошибки используется символ?—?“?”.

const observable = new Observable((observer) => {
  observer.error();
});



Потоки в одну строчку


В моей практике я редко сталкивался с необходимостью создавать свои экземпляры Observable напрямую. Большинство методов для создания потоков уже есть в RxJS. Чтобы создать поток, испускающий значения 1 и 2, достаточно лишь использовать метод of:

const observable = of(1, 2);

Метод of принимает на вход любое количество аргументов и возвращает готовый экземпляр Observable. После подписки он испустит полученные значения и завершится:



Если вы хотите представить массив в виде потока, то можно воспользоваться методом from. Метод from в качестве аргумента ожидает любой итерируемый объект(массив, строка и т.д.) или promise, и проецирует этот объект на поток. Вот как будет выглядеть поток, полученный из строки:

const observable = from('abc');



А вот так можно обернуть promise в поток:

const promise = new Promise((resolve, reject) => {
  resolve(1);
});
const observable = from(promise);



Примечание: часто потоки сравнивают с promise. На самом деле, они имеют всего одну общую черту?—?push стратегию распространения изменений. В остальном это абсолютно разные сущности. Promise не может выдать несколько значений. Он может только выполнить resolve или reject, т.е. иметь только два состояния. Поток же может передавать несколько значений, и может быть повторно использован.

А помните пример с интервалом из первой статьи? Данный поток представляет собой таймер, который отсчитывает время в секундах с момента подписки.

const timer = new Observable(observer => {
  let counter = 0;
  const intervalId = setInterval(() => {
    observer.next(counter++);
  }, 1000);
  return () => {
   clearInterval(intervalId);
  }
});

Вот как в одну строчку можно реализовать то же самое:

const timer = interval(1000);



И напоследок метод, который позволяет создать поток событий DOM элементов:

const observable = fromEvent(domElementRef, 'keyup');

В качестве значений этот поток будет получать и испускать объекты события “keyup”.

Пайпы & операторы


Pipe?—?это метод класса Observable, добавленный в RxJS в версии 5.5. Благодаря ему мы можем строить цепочки операторов для последовательной обработки значений, полученных в потоке. Pipe представляет собой однонаправленный канал, который связывает между собой операторы. Сами операторы являются обычными функциями, описанными в RxJS, которые обрабатывают значения из потока.

Например, они могут преобразовать значение и передать его дальше в поток, или могут выполнять роль фильтров и не пропускать какие-либо значения, если они не соответствуют заданному условию.

Посмотрим на операторы в деле. Умножим каждое значение из потока на 2 с помощью оператора map:

of(1,2,3).pipe(
  map(value => value * 2)
).subscribe({
  next: console.log
});

Вот как выглядит поток до применения оператора map:



После оператора map:



Давайте воспользуемся оператором filter. Этот оператор работает точно так же, как функция filter в классе Array. Первым аргументом метод принимает функцию, в которой описано какое-либо условие. Если значение из потока удовлетворяет условию, то оно пропускается дальше:

of(1, 2, 3).pipe(
  // пропускаем только нечетные значения
  filter(value => value % 2 !== 0),
  map(value = value * 2)
).subscribe({
  next: console.log
});

И вот как будет выглядеть вся схема нашего потока:



После filter:



После map:



Примечание: pipe !== subscribe. Метод pipe декларирует поведение потока, но не выполняет подписку. Пока вы не вызовете метод subscribe, ваш поток не начнет работать.

Пишем приложение


Теперь, когда мы выяснили, что такое пайпы и операторы, можно приступать к практике. Наше приложение будет выполнять одну простую задачу: выводить список открытых репозиториев github по введенному никнейму владельца.

Требований будет немного:

  • не выполнять запрос к API, если введенная в input строка содержит менее 3-х символов;
  • Чтобы не выполнять запрос на каждый введенный пользователем символ, следует установить задержку(debounce) в 700 миллисекунд перед обращением к API;

Для поиска репозиториев мы воспользуемся github API. Сами же примеры я рекомендую выполнять на stackblitz. Там же я выложил готовую реализацию. Ссылки представлены в конце статьи.

Начнем с html разметки. Опишем input и ul элементы:

<input type="text">
<ul></ul>

Затем в js или ts файле получим ссылки на текущие элементы используя browser API:

const input = document.querySelector('input');
const ul = document.querySelector('ul');

Ещё нам понадобится метод, который будет выполнять запрос к github API. Ниже приведен код функции getUsersRepsFromAPI, которая принимает на вход никнейм пользователя и выполняет ajax запрос, используя fetch. Затем возвращает promise, попутно преобразуя успешный ответ в json:

const getUsersRepsFromAPI = (username) => {
  const url = `https://api.github.com/users/${ username }/repos`;
  
  return fetch(url)
    .then(response => {
      if(response.ok) {
        return response.json();
      }

      throw new Error('Ошибка');
    });
}

Следом напишем метод, который будет выводить список названий репозиториев:

const recordRepsToList = (reps) => {
  for (let i = 0; i < reps.length; i++) {
  
    // если элемент не существует, то создаем его
    if (!ul.children[i]) {
      const newEl = document.createElement('li');
      ul.appendChild(newEl);
    }
    // записываем название репозитория в элемент
    const li = ul.children[i];
    li.innerHTML = reps[i].name;
  }
  // удаляем оставшиеся элементы
  while (ul.children.length > reps.length) {
    ul.removeChild(ul.lastChild);
  }
}

Приготовления завершены. Настало время посмотреть на RxJS в действии. Нам необходимо слушать событие keyup нашего input’а. Первым делом мы должны понять, что в реактивном подходе мы работаем с потоками. К счастью, в RxJS уже предусмотрен подобный вариант. Вспомните метод fromEvent, который я упоминал выше. Используем его:

const keyUp = fromEvent(input, 'keyup');

keyUp.subscribe({
  next: console.log
});

Теперь наше событие представлено как поток. Если мы посмотрим, что выводится в консоль, то увидим объект типа KeyboardEvent. Но нам нужно введенное пользователем значение. Вот тут-то нам и пригодится метод pipe и оператор map:

fromEvent(input, 'keyup').pipe(
  map(event => event.target.value)
).subscribe({
  next: console.log
});

Перейдем к реализации требований. Начнем с того, что будем выполнять запрос, когда введенное значение содержит более двух символов. Для этого воспользуемся оператором filter:

fromEvent(input, 'keyup').pipe(
  map(event => event.target.value),
  filter(value => value.length > 2)
)

С первым требованием разобрались. Приступим ко второму. Нам необходимо реализовать debounce. В RxJS есть оператор debounceTime. Данный оператор в качестве первого аргумента принимает число миллисекунд, в течение которых значение будет удерживаться, прежде чем пройдет дальше. При этом каждое новое значение будет сбрасывать таймер. Таким образом, на выходе мы получим последнее значение, после ввода которого прошло 700 миллисекунд.

fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(value => value.length > 2)
)

Вот как может выглядеть наш поток без debounceTime:



А вот так будет выглядеть тот же поток, пропущенный через этот оператор:



С debounceTime мы будем реже обращаться к API, за счет чего получим экономию трафика и разгрузим сервер.

В целях дополнительной оптимизации предлагаю использовать еще один оператор?—?distinctUntilChanged. Данный метод избавит нас от дубликатов. Лучше всего показать его работу на примере:

from('aaabccc').pipe(
  distinctUntilChanged()
)

Без distinctUntilChanged:



С distinctUntilChanged:



Добавим данный оператор сразу после оператора debounceTime. Таким образом, мы не будем обращаться к API, если новое значение по какой-то причине совпадает с предыдущим. Подобная ситуация может возникнуть, когда пользователь ввел новые символы, а затем снова стер их. Так как у нас реализована задержка, в поток попадет только последнее значение, ответ на которое у нас уже есть.

Идем на сервер


Уже сейчас мы можем описать логику запроса и обработки ответа. Пока мы умеем только работать с promise. Поэтому опишем еще один оператор map, который будет вызывать метод getUsersRepsFromAPI. В наблюдателе опишем логику обработки нашего promise:

/* Код учебный! В рабочих проектах с RxJS лучше избегать использования promise, вместо него стоит использовать потоки */
fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(val => val.length > 2),
  distinctUntilChanged(),
  map(value => getUsersRepsFromAPI(value))
).subscribe({
  next: promise => promise.then(reps => recordRepsToList(reps))
});

На текущий момент мы реализовали все, что хотели. Но у нашего примера есть один большой недостаток: нет обработки ошибок. Наш наблюдатель получает только promise и понятия не имеет, что что-то могло пойти не так.

Конечно, мы можем навесить catch на promise в методе next, но из-за этого наш код начнет все больше напоминать “callback hell”. Если вдруг нам понадобится выполнить еще один запрос, то сложность кода возрастет.

Примечание: использование promise в коде с RxJS считается антипаттерном. Promise имеет множество недостатков по сравнению с observable. Его нельзя отменить, и нельзя использовать повторно. Если перед вами стоит выбор, то выбирайте observable. То же самое касается метода toPromise класса Observable. Данный метод был реализован в целях совместимости с библиотеками, которые не могут работать с потоками.

Мы можем использовать метод from, чтобы спроецировать promise на поток, но этот способ чреват дополнительными вызовами метода subscribe, и также приведет к разрастанию и усложнению кода.

Решить эту проблему можно с помощью оператора mergeMap:

fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(val => val.length > 2),
  distinctUntilChanged(),
  mergeMap(value => from(getUsersRepsFromAPI(value)))
).subscribe({
  next: reps => recordRepsToList(reps),
  error: console.log
})

Теперь нам не нужно писать логику обработки promise. Метод from сделал из promise поток, а оператор mergeMap обработал его. Если promise выполнится успешно, то вызовется метод next, и наш наблюдатель получит готовый объект. Если же произойдет ошибка, то будет вызван метод error, и наш наблюдатель выведет ошибку в console.

Оператор mergeMap немного отличается от тех операторов, с которыми мы работали ранее, он принадлежит к так называемым Higher Order Observables, о которых я расскажу в следующей статье. Но, забегая вперед, скажу, что метод mergeMap сам подписывается на поток.

Обработка ошибок


Если наш поток получит ошибку, то он завершится. И если попытаться после ошибки взаимодействовать с приложением, то никакой реакции мы не получим, так как наш поток завершился.

Тут нам поможет оператор catchError. catchError вызывается только тогда, когда в потоке появляется ошибка. Он позволяет перехватить ее, обработать и вернуть в поток обычное значение, которое не приведет к его завершению.

fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(val => val.length > 2),
  distinctUntilChanged(),
  mergeMap(value => from(getUsersRepsFromAPI(value))),
  catchError(err => of([])
).subscribe({
  next: reps => recordRepsToList(reps),
  error: console.log
})

Мы перехватываем ошибку в catchError и вместо нее возвращаем поток с пустым массивом. Теперь при возникновении ошибки мы очистим список репозиториев. Но затем поток снова завершится.

Все дело в том, что catchError подменяет наш оригинальный поток на новый. И дальше наш наблюдатель слушает только его. Когда поток of испустит пустой массив, будет вызван метод complete.

Чтобы не подменять наш оригинальный поток, вызовем оператор catchError на потоке from внутри оператора mergeMap.

fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(val => val.length > 2),
  distinctUntilChanged(),
  mergeMap(value => {
    return from(getUsersRepsFromAPI(value)).pipe(
      catchError(err => of([])
    )
  })
).subscribe({
  next: reps => recordRepsToList(reps),
  error: console.log
})

Таким образом, наш оригинальный поток ничего не заметит. Вместо ошибки он получит пустой массив.

Заключение


Мы наконец-то перешли к практике и увидели, для чего нужны пайпы и операторы. Рассмотрели, как можно сократить код, пользуясь богатым API, которое предоставляет нам RxJS. Конечно, наше приложение не закончено, в следующей части мы разберем, как можно в одном потоке обрабатывать другой и как отменять наш http запрос, чтобы еще больше экономить трафик и ресурсы нашего приложения. А чтобы вы могли увидеть разницу, я выложил пример без использования RxJS, посмотреть его можно здесь. По этой ссылке вы найдете полный код текущего приложения. Для генерации схем я пользовался RxJS визуализатором.

Надеюсь, что данная статья помогла вам лучше понять, как устроен RxJS. Желаю вам успехов в изучении!

Комментарии (2)


  1. puyol_dev2
    18.03.2019 22:11

    Когда я вижу такое

    // удаляем оставшиеся элементы
      while (ul.children.length > reps.length) {
        ul.removeChild(ul.lastChild);
      }


    Мне хочется плакать (


  1. Imbecile
    19.03.2019 00:32

    Очень хорошо. Сколько я не подходил к попыткам объяснить, что такое RxJS, как он работает и для чего, получалась какая-то каша. Тут всё более-менее понятно.
    Но я бы заменил пример distinctUntilChange с использования строки на использование массива. Чуть более понятно будет, что пока в поток идут одинаковые значения, он молчит.
    И да, почему mergeMap, а не switchMap?