Если вы работали с Angular, то наверняка встречались с RxJS. Потоки, развесистые конструкции, много аргументов у метода pipe, а каждый аргумент возвращают разные функции с разным количеством аргументов. Есть интуитивно понятные функции типа filter или map. Первый явно фильтрует значения в потоке, а второй эти значения меняет. Такие функции называют операторами. И чем глубже вы проваливаетесь в RxJS, тем больше самых разных операторов вы узнаете. И со временем добираетесь до потоков потоков. То есть вместо обычных значений такой поток эмитит другие потоки. Такие потоки называют Higher Order Observables. И для работы с такими потоками существуют специальные операторы. Возможно, вы слышали, что такие операторы называют Higher Order Operators (HOO). Они могут выравнивать потоки или, другими словами, делать их обычным.

В этой статье я покажу, что в HOO нет ничего мифического, и расскажу в каких случаях вам нужно использовать операторы высшего порядка. Сейчас вы подумаете, что это скучный лонгрид, но не торопитесь. Мы рассмотрим всего 4 оператора: switchMap, exhaustMap, concatMap и mergeMap.

switchMap

switchMap однозначно самый популярный из всех. Но почему? А по одной просто причине — этот оператор избавляет нас от очень часто встречающегося состояния гонки.

Давайте рассмотрим код:

const searchInput = document.getElementById('search-input');
const search$ = fromEvent(searchInput, 'input').pipe(
  map(event => event.target.value)
);

search$.subscribe((query) => {
  http.get('/search', {params: {query}}).subcribe((result) => {
    console.log(result);
  })
})

В этом коде мы находим input, с которым взаимодействует пользователь и подписываемся на событие input. То есть поток search$ эмитит строки. Внутри подписки мы видим, что на каждый эмит строки отправляется запрос на сервер и ответ сервиса выводится в консоль.

В этом коде можно увидеть как минимум две проблемы:

  1. **Состояние гонки. ** Обычно при поиске чего-либо пользователю важно видеть результат именно последнего запроса. Но код такого вида не дает нам гарантии, что последние данные, выведенные в консоль, соответствуют последней испускаемой строке в потоке search$.

  2. Подписка в подписке и ни одной отписки. Есть очень хорошее правило, следуя которому можно избавить себя от многих проблем, — на каждую подписку должна быть отписка. Это как минимум снизит вероятность утечки памяти.

Но давайте подумаем, как должен работать поиск:

  1. В момент эмита строки проверить наличие активных запросов

  2. Если запросов нет, то перейти к пункту 4

  3. Отписаться от предыдущих запросов

  4. Подписаться на новый запрос

  5. Вывести в консоль

В лоб это можно реализовать так:

const searchInput = document.getElementById('search-input');

const search$ = fromEvent(searchInput, 'input').pipe(
  map(event => event.target.value)
);


let subscription;
search$.subscribe((query) => {
  if (subscription) {
    subscription.unsubscribe();
  }

  subscription = http.get('/search', {params: {query}}).subcribe((result) => {
    console.log(result);
  })
})

Но что если я вам скажу, что первые 4 пункта требований делает оператор switchMap? Давайте перепишем код:

const searchInput = document.getElementById('search-input');
const search$ = fromEvent(searchInput, 'input').pipe(
  map(event => event.target.value)
);

search$.pipe(
  switchMap((query) => http.get('/search', {params: {query}}))
).subscribe((result) => {
  console.log(result);
});

switchMap гарантирует нам, что мы всегда будем получать результаты последнего потока и избавляет нас от состояния гонки. Ну и приятный бонус будет состоять в том, что отписавшись от внешней подписки автоматически произойдет и отписка от внутренней. Профит!

Резюмируем. switchMap можно использовать в случаях, когда нам важен результат последнего действия, например, при поиске или реализации бесконечного скрола. Если все предыдущие действия могут не учитываться, то можно смело брать switchMap.

exhaustMap

exhaustMap однозначно самый непопулярный из всех. Причина мне не до конца понятна, но с помощью него так же можно реализовать поиск, но используя другой подход.

const searchInput = document.getElementById('search-input');
const searchButton = document.getElementById('search-button');

const searchButtonClick$ = fromEvent(searchButton, 'click');

searchButtonClick$.subscribe((result) => {
  const query = searchInput.value;
  http.get('/search', { params: { query } }).subscribe((result) => {
    console.log(result);
  });
});

В этом коде инициатор запрос — клик по кнопке.

Собственно, этот код имеет такие же проблемы, как и в предыдущем примере, но здесь у нас другие требования:

  1. В момент нажатия на кнопку проверить наличие активных запросов

  2. Если есть активный, ничего не делать и ждать следующего клика

  3. Если нет активного, подписаться на выполнение запроса

  4. Вывести результат в консоль

Реализуем в лоб:

const searchInput = document.getElementById('search-input');
const searchButton = document.getElementById('search-button');

const searchButtonClick$ = fromEvent(searchButton, 'click');

let isRequestPending = false;
searchButtonClick$.subscribe((result) => {
  if (isRequestPending) {
    return;
  }

  isRequestPending = true;
  const query = searchInput.value;
  http.get('/search', { params: { query } }).subscribe((result) => {
    isRequestPending = false;
    console.log(result);
  });
});

Как и в первом случае нам понадобилась еще одна переменная за пределами потока. Это добавляет коду сложности и если мы захотим изменить поведение потока все придется переписать. Ну и как вы уже догадались на помощь нам приходит exhaustMap.

const searchInput = document.getElementById('search-input');
const searchButton = document.getElementById('search-button');

const searchInput$ = fromEvent(searchButton, 'click');

searchInput$.pipe(
  exhaustMap(() => {
    const query = searchInput.value;
    return http.get('/search', { params: { query } });
  })
).subscribe((result) => {
  console.log(result);
});

Резюмируем. exhaustMap нужно использовать в случаях, когда при активной подписке на поток остальные можно игнорировать, как в случае с поиском по нажатию кнопке или, например, при пропуске событий начала анимации при ее воспроизведении.

mergeMap

mergeMap — оператор, который объединяет все внутренние потоки в один выходной поток. Это значит, что внутренние потоки могут завершаться в любом порядке, и их результаты будут объединены вместе. И это самое простое объяснение, которое я смог из себя выдавить.

Давайте перейдем к примерам:

entityId$.subscribe((id) => {
  entityService.get(id).subscribe(() => {
    entityStore.upsertItem(entity);
  });
});

В этом коде мы видим entityId$ — это поток строк с id некой сущности. Здесь мы должны на каждый id запросить данные по сущности с сервера и добавить или обновить эту сущность в стор. Собственно, именно это наш код и делает и решать тут нечего. Но проблемы есть и в этом случае они совершенно идентичны предыдущим. Давайте попробуем усложнить задачу и ввести ограничение в 3 запроса в один момент времени.

Решаем:

const CONCURRENT_REQUESTS = 3;
let activeRequests = 0;
const queue = [];

function processNext() {
  if (queue.length === 0 || activeRequests >= CONCURRENT_REQUESTS) {
    return;
  }

  const id = queue.shift();
  activeRequests++;

  entityService.get(id).subscribe(entity => {
    entityStore.upsertItem(entity);
    activeRequests--;

    processNext();
  });
}

entityId$.subscribe(id => {
  queue.push(id);
  processNext();
});

Я даже не пытался проверять код на работоспособность, потому что написал его прямо в редакторе текста. Код стал сложно-читаемым. Функция processNext имеет несколько побочных эффектов внутри. А еще есть дополнительные переменные за пределами потока и функции. Сложить все это воедино достаточно сложно.

Именно такие задачи решает mergeMap. Давайте перепишем первый пример с использованием этого оператора:

entityId$.pipe(
  mergeMap((id) => entityService.get(id))
).subscribe((entity) => {
  entityStore.upsertItem(entity);
});

В этом коде mergeMap подписывается на каждый поток, возвращенный entityService.get(id), и их значения выдает в одном едином потоке.

Хорошо, а как быть с ограничением в 3 запроса в один момент времени? Оказывается, что mergeMap уже и так все умеет. Второй аргумент в mergeMap принимает число, которое как раз настраивает конкурентность.

entityId$.pipe(
  mergeMap((id) => entityService.get(id), 3)
).subscribe((entity) => {
  entityStore.upsertItem(entity);
});

Вот и все!

Резюмируем. mergeMap отлично подходит, когда вы хотите выполнять параллельные действия и объединять их результаты. Однако следует быть осторожным, так как может возникнуть много активных запросов, если исходный поток излучает значения слишком быстро.

concatMap

concatMap — последний оператор высшего порядка. Ключевое отличие заключается в том, что concatMap поддерживает порядок выполнения. Он дождется завершения одного внутреннего потока, прежде чем перейдет к следующему.

Что бы практически посмотреть на его использование, мы можем взять предыдущий пример и поменять к нему требования. Так получилось, что нас перестало устраивать неупорядоченность запросов и мы хотим выполнять их не параллельно, а по очереди. То есть конкурентность должна стать равной единице.

entityId$.pipe(
  mergeMap((id) => entityService.get(id), 1)
).subscribe((entity) => {
  entityStore.upsertItem(entity);
});

Но mergeMap с конкурентностью 1 делает ровно тоже самое, что и concatMap! Буквально. Это прекрасно видно в исходном коде оператора.

То есть использование mergeMap с конкурентностью 1 на столько частый кейс, что его вынесли в отдельный оператор.

Резюмируем. concatMap прекрасно подходит для ситуаций, когда порядок выполнения важен. Если вы хотите обработать последовательность действий без параллельной обработки, это ваш выбор.

Заключение

Высокоуровневые операторы являются мощным инструментом в арсенале каждого разработчика, работающего с реактивным программированием. Они предоставляют гибкость и элегантность при обработке сложных наблюдаемых данных и позволяют сократить код, делая его более читаемым и поддерживаемым.

  • Оператор switchMap отлично подходит, когда нам важен только последний результат излучения, например, в случае поиска в реальном времени.

  • exhaustMap идеален для случаев, когда нам нужно игнорировать новые наблюдаемые объекты до завершения текущего.

  • mergeMap позволяет обрабатывать несколько входящих наблюдаемых объектов параллельно, но может привести к перегрузке, если не контролировать количество одновременных потоков.

  • concatMap гарантирует порядок обработки, выполняя каждый внутренний наблюдаемый объект последовательно.

При правильном использовании эти операторы могут справляться с множеством реактивных задач, будь то наблюдаемые события из пользовательского интерфейса, HTTP-запросы или даже сложные анимационные последовательности.

Однако ключевое слово здесь - правильное использование. Всегда анализируйте требования вашего приложения и тщательно выбирайте подходящий оператор. Это поможет избежать нежелательных побочных эффектов и создать реактивные решения, которые могут масштабироваться и легко поддерживаться.

Реактивное программирование предлагает множество инструментов, и среди них высокоуровневые операторы занимают особое место. Проведя время на изучение и понимание их особенностей, вы значительно можете улучшить качество и эффективность вашего кода.

Оставайтесь в курсе!

Если вы нашли эту статью полезной и хотите больше читать и видеть еще больше моего контента обязательно подпишитесь на мой канал в Telegram под названием Techlead's Diary. Вы также можете оставаться на связи со мной в Twitter, чтобы участвовать в дискуссиях. Присоединяйтесь к обсуждению и продолжайте совершенствовать свои навыки разработки!

Комментарии (1)


  1. bogdan_uman
    29.08.2023 07:55

    Ну как для меня, в этой статье лучше написано, более доходчиво. https://habr.com/ru/articles/715882/