Небольшая история

Я Angular разработчик. Это моя первая статья и таких я собираюсь написать много. Пришла эта идея мне в голову потому что иногда, пытаясь найти какую-то информацию в интернете о принципах работы какого-либо инструмента разработки, я не нахожу абсолютно ничего. Это либо ненавистные обожаемые мною доки, в которых написаны очень краткие принципы работы для знающих людей, либо stackoverflow, где кто-то норовит написать кучу слов ни о чём или без практической составляющей, либо просто статьи, не отражающие сути. Но иногда можно наткнуться на знающего человека, который за одну минуту объяснит тебе всю суть и от счастья хочется прыгать. Я решил писать обо всех таких моментах, которые мне очень сложно давались в понимании из-за отсутствия адекватной информации или моей криворукости. Я буду писать подобные статьи прежде всего для себя и если кому-то это поможет - я буду безумно рад, что какому-нибудь камраду не пришлось долго мучиться и понимать смысл того или иного инструмента для разработки. Статьи будут максимально краткими и по делу. Погнали!


Общее

1) У нас есть метод в сервисе нашего Angular приложения

export interface RxjsExample {
    id: number;
    name: string;
}

getRxJxExampleById(id: number): Observable<RxjsExample> {
    return this.httpClient.get<RxjsExample>(`${this.basePath}/users/${id}`)
        .pipe(
            catchError(err => {
                console.log(err)
                return EMPTY;
            })
        );
}

Тут всё элементарно: происходит запрос на backend и, в зависимости от id, мы получаем разные ответы в виде

{
  id: number;
  name: string;
}

2) Есть backend нашего приложения

mergeConcatSwitchExhaustedRoutes.get('/users/:id', async (req: Request, res: Response): Promise<void> => {
    const id = req.params.id;

    if (id === '1') {
        await wait(3000);
        res.status(200).send({
            id: 1,
            name: 'Первый'
        });
    }
    if (id === '2') {
        await wait(1000);
        res.status(200).send({
            id: 2,
            name: 'Второй'
        });
    }
    if (id === '3') {
        await wait(5000);
        res.status(200).send({
            id: 3,
            name: 'Третий'
        });
    }
    if (id === '4') {
        await wait(2000);
        res.status(200).send({
            id: 4,
            name: 'Четвёртый'
        });
    }
});

const wait = (ms: number): Promise<void > => {
    return new Promise(resolve => setTimeout(resolve, ms));
}

Просто роут на бэке, тут в зависимости от id мы возвращаем ответ с определённым ожиданием. Это сделано специально для понимания операторов, описываемых ниже


switchMap

(Злюка)

Начнём с самого простого и понятного оператора switchMap

switchMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            switchMap((id) => this.rxjsExampleService.getRxJxExampleById(id)),
            tap((result) => console.log(result))
        )
        .subscribe();
}

Итак, мы эмитим четыре значения подряд, это наши idшки, по ним мы делаем запрос на бэк, как и было сказано выше

Оператор switchMap, получая новое значение, создаёт новый поток (Observable) и, получив следующее значение, тут же отпишется от предыдущего, создаст новый и подпишется на него

Алгоритм:

  • Эмит 1 -> пошёл запрос на сервер для id = 1

  • Эмит 2 -> отписались от выполнения 1, подписались на id = 2

  • Эмит 3 -> отписались от выполнения 2, подписались на id = 3

  • Эмит 4 -> отписались от выполнения 3, подписались на id = 4

  • Дождались ответа от сервера по id = 4

// Ответ
{
    "id": 4,
    "name": "Четвёртый"
}

mergeMap

(Добряш)

mergeMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            mergeMap((id) => this.rxjsExampleService.getRxJxExampleById(id)),
            tap((result) => console.log(result))
        )
        .subscribe();
}

Оператор mergeMap, получив следующий эмит, подпишется на него и создаст ещё один поток (Observable), не отписываясь от предыдущего и выполнит их в рандомном порядке. То есть, в данном случае, мы получим 4 независимых потока, которые выполнятся в любом порядке, независимо от времени ответа от сервера

Алгоритм:

  • Эмит 1 -> ждём ответа от id = 1 (3 секунды) (параллельно)

  • Эмит 2 -> ждём ответа от id = 2 (1 секунда) (параллельно)

  • Эмит 3 -> ждём ответа от id = 3 (5 секунд) (параллельно)

  • Эмит 4 -> ждём ответа от id = 4 (2 секунды) (параллельно)

Все запросы идут параллельно, никто никого не ждёт

// Первый ответ
{
    "id": 2,
    "name": "Второй"
}
// Второй ответ
{
    "id": 4,
    "name": "Четвёртый"
}
// Третий ответ
{
    "id": 1,
    "name": "Первый"
}
// Четвёртый ответ
{
    "id": 3,
    "name": "Третий"
}

Дополнительная информация:

mergeMap может принять второй параметр concurrent, в коде ниже это число 2

mergeMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            mergeMap((id) => this.rxjsExampleService.getRxJxExampleById(id), 2),
            tap((result) => console.log(result))
        )
        .subscribe();
}

concurrent ограничивает количество параллельно выполняющихся потоков. В данной ситуации, если у нас есть два сэмиченных выполняющихся значения, то mergeMap будет ждать выполнения хотя бы одного из них, чтобы начать выполнять следующий, третий.

Аналогия их жизни: огромная очередь на кассу в магазине, а касс 2 штуки, соотвественно, они параллельно могут обслужить только двух клиентов и, когда одна из них освободится - следующий клиент подойдёт к освободившейся быстрее всего кассе.

А знаете как будет работать mergeMap, если concurrent будет равен 1? Как следующий оператор:)


concatMap

(Перфекционист)

concatMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            concatMap((id) => this.rxjsExampleService.getRxJxExampleById(id)),
            tap((result) => console.log(result))
        )
        .subscribe();
}

Оператор concatMap, получив следующий эмит, подпишется на него и создаст ещё один поток (Observable), не отписываясь от предыдущего и выполняя их в строго заданном порядке, независимо от того как долго он будет получать ответ от сервера по каждому id

Алгоритм

  • Эмит 1 -> ждём ответа от id = 1 (3 секунды) пока не выполнится

  • Эмит 2 -> ждём ответа от id = 2 (1 секунда) пока не выполнится

  • Эмит 3 -> ждём ответа от id = 3 (5 секунд) пока не выполнится

  • Эмит 4 -> ждём ответа от id = 4 (2 секунды) пока не выполнится

// Первый ответ
{
    "id": 1,
    "name": "Первый"
}
// Второй ответ
{
    "id": 2,
    "name": "Второй"
}
// Третий ответ
{
    "id": 3,
    "name": "Третий"
}
// Четвёртый ответ
{
    "id": 4,
    "name": "Четвёртый"
}

exhaustMap

(У меня есть задача, отвалите)

exhaustMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            exhaustMap((id) => this.rxjsExampleService.getRxJxExampleById(id)),
            tap((result) => console.log(result))
        )
        .subscribe();
}

Оператор exhaustMap, получив первый эмит, забивает на все последующие эмиты, соотвественно, не создавая новые потоки, пока не выполнит запрос по первому. То есть idшки 2, 3 и 4 просто пройдут мимо и никогда не выполнятся

Алгоритм:

  • Эмит 1 -> ждём ответа от id = 1 (3 секунды) пока не выполнится

  • Эмит 2 -> не выполняется, ещё первый не выполнился, пролетел

  • Эмит 3 -> не выполняется, ещё первый не выполнился, пролетел

  • Эмит 4 -> не выполняется, ещё первый не выполнился, пролетел

// Первый и единственный ответ
{
    "id": 1,
    "name": "Первый"
}

Для тех, кто хочет сам потыкать на кнопки и посмотреть как работают операторы вот проекты на github

Фронт часть

Бэк часть

Это всё. Надеюсь, статья была для вас полезной.

Спасибо :)

Комментарии (7)


  1. Cubango
    00.00.0000 00:00
    +2

    насчёт mergeMap не совсем корректно говорить что он просто "примет все и выполнит в любом порядке". На самом деле, на каждый внешний эмит данный оператор создаст отдельный Observable, и вернёт mergeAll всех созданных Observable.


    1. erst_dev Автор
      00.00.0000 00:00

      Спасибо, изучу этот момент и отредактирую


  1. ryanl
    00.00.0000 00:00
    +1

    Продолжайте, очень полезно, как бывший фуллстек буду почитывать.


  1. devbulat
    00.00.0000 00:00

    Спасибо за статью! Изучаю RxJS, было очень полезно.


  1. XXLink
    00.00.0000 00:00
    +4

    При всём уважении. Но таких примеров полно на просторах интернета. Имитируйте реальную задачу. Например, выбор пользователем какого-то товара вместо банального 1234. Ну например, поиск авто по каталогу. Первый селектор - марка, второй - модель, третий - год, четвёртый - цвет. Так читателю будет нагляден смысл switchmap. А ещё можно написать рядом вариант без него, т. е. Подписку в подписке, продемонстрировав тем самым как убого будет выглядеть код бег этого оператора.


  1. movl
    00.00.0000 00:00
    +1

    По моему опыту, самое полезное, что я сделал в плане понимания работы этих операторов, это просто разобрался с marble diagrams. Потратил несколько минут, а все вопросы просто исчезли сами собой.


  1. ubx7b8
    00.00.0000 00:00

    Поскольку на отдельную статью это не тянет, то добавлю здесь:
    Лучше один раз увидеть, чем сто раз услышать, а иногда лучше увидеть это в движении.
    http://rxjs.rsh.icu/
    Как раз про эти 4 оператора. До других руки так и не дошли.
    Можно задать другие интервалы для входных данных, можно сравнить операторы между собой.