RxJS (Reactive Extensions for JavaScript) — мощный инструмент для работы с асинхронными потоками данных, который используется во многих современных веб-приложениях. Хотя RxJS предоставляет богатую коллекцию операторов, иногда для решения специфических задач бывает необходимо писать свои собственные. Это позволяет избежать дублирования кода и повысить читаемость программы.

Создание своих операторов RxJS может показаться сложным, особенно для тех, кто только начал использовать библиотеку. Однако, фундаментальные принципы их разработки понятны, если погрузиться в механику работы RxJS. В этой статье мы углубимся в то, как создавать собственные pipeable и creation operators, а также рассмотрим практические примеры их применения.

Что такое оператор RxJS?

Оператор RxJS — это чистая функция, которая принимает Observable как входной параметр (или создает Observable) и возвращает иной Observable, преобразованный в соответствии с заданной логикой. Операторы являются основой реактивного программирования, позволяя удобно трансформировать, фильтровать, комбинировать, управлять потоками данных и процессами асинхронной обработки.

Основные характеристики оператора RxJS

  1. Чистота функции: Оператор должен быть чистой функцией. Это значит, что он не влияет на внешние состояния и любой вызов оператора с одинаковыми входными параметрами всегда должен давать одинаковый результат.

    Пример: Оператор map всегда возвращает значения, применяя трансформацию, без побочных эффектов.

  2. Неизменность входящего Observable: Оператор не изменяет исходный Observable, а создает новый — это важное свойство функционального программирования. Оригинальный поток остается неизменным.

  3. К цепочке операторов можно добавлять новые логики: Операторы поддерживают композиционность, что означает, что один или несколько операторов могут объединяться в цепочку, используя метод pipe.

Типы операторов

Операторы делятся на два типа:

  • Pipeable операторы — обычные функции, которые используются в цепочке (pipe). Они принимают поток и возвращают новый поток:

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3).pipe(
  map(x => x * 2)
).subscribe(console.log); // 2, 4, 6
  • Creation Functions — функции, которые создают Observable. Например, of, fromEvent:

   import { fromEvent } from 'rxjs';
   const clicks$ = fromEvent(document, 'click');
   clicks$.subscribe(console.log);

Как работают pipeable операторы

Любой пользовательский pipeable оператор обычно выполняет следующие задачи:

  • Принимает входной поток (source).

  • Добавляет к нему кастомную логику.

  • Создает и возвращает новый Observable, который подчиняется полученной логике.

Вы можете представить их как цепочку обработки данных в конвейере. Давайте разберёмся, как написать такой оператор.

Как создать pipeable оператор

Шаг 1: Определите задачу

Определите, какую задачу будет решать ваш кастомный оператор. Например, давайте реализуем оператор debounceIf, который применяет debounce к потоку только при выполнении определенного условия. Этот оператор может использоваться для оптимизации обработки событий, таких как клики мыши или ввод текста.

Шаг 2: Определение базовой структуры

Создадим функцию, которая будет являться нашим оператором:

import { Observable } from 'rxjs';

export function debounceIf<T>(condition: () => boolean, delayTime: number)
  :(source$: Observable<T>) => Observable<T> {
  return function (source$: Observable<T>): Observable<T> {
    return new Observable<T>(subscriber => {
      // Создаем кастомную подписку
    });
  };
}
  • condition — возвращает true или false, определяя, нужно ли применять debounce.

  • delayTime — время задержки в миллисекундах.

Шаг 3: Реализация логики

import {Observable} from 'rxjs';

export function debounceIf<T>(condition: () => boolean, duration: number)
  : (source: Observable<T>) => Observable<T> {
  return source => new Observable(observer => {
    let timeoutId: any | null = null;
    let subscription = source.subscribe({
      next(value) {
        if (timeoutId) clearTimeout(timeoutId) // сбрасываем таймер чтобы исключить срабатывания
        if (condition()) {
          // Если условие выполнено, используем debounce
          timeoutId = setTimeout(() => {
            observer.next(value);
            console.log(timeoutId)
          }, duration);
        } else {
          // Если нет, пропускаем значение сразу
          observer.next(value);
        }
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      },
    });
    return () => {
      subscription.unsubscribe();
      if (timeoutId) clearTimeout(timeoutId);
    };
  });
}

Пример использования:

import {Component, OnInit} from '@angular/core';
import {fromEvent} from "rxjs";
import {debounceIf} from "@pipes/debounceIf";

@Component({
  selector: 'app-home',
  templateUrl: 'home.component.html',
})
export class HomeComponent implements OnInit {
  clicks: number[] = [];
  debounced = false;

  ngOnInit(): void {
    fromEvent(document, 'click')
        .pipe(debounceIf(() => this.debounced, 3000))
        .subscribe(() => this.clicks.push(new Date().getTime()));
  }

  toggleDebounced(): void {
    this.debounced = !this.debounced;
  }
}

Этот код будет пропускать только последний клик по кнопке если в течение двух секунд не происходило новых нажатий при включенном переключателе.


Операторы создания (creation operators)

Сreation function в RxJS отвечает за создание нового Observable с определенной логикой. В отличие от операторов (которые принимают поток и возвращают поток), creation-функция используется там, где вам необходимо начать с чего-то нового, например: фиксированных значений, событий DOM, таймеров и т.д. Примеры встроенных creation functions в RxJS: of, from, interval, fromEvent.

Что создадим:

Предположим, мы хотим создать функцию delayedRange, которая через определенную задержку эмитирует значения из диапазона чисел. Она позволяет настроить:

  • начальное число диапазона;

  • конечное число диапазона;

  • задержку между эмиссиями значений.

Шаги создания кастомной creation function

  • Использование new Observable
    Все кастомные creation functions в RxJS строятся на использовании конструктора Observable. Конструктор принимает аргументом функцию (subscribe), в которой определяется логика эмиссии значений.

  • Поддержка управления подпиской
    Необходимо предоставить механизм для очистки ресурсов (unsubscribe). Это может быть полезно, если подписчик завершает поток раньше времени. Например, нужно очищать таймеры, чтобы не было утечек памяти.

  • Эмит данных с использованием next, обработка ошибок и завершение
    Через объект subscriber мы вызываем соответствующие методы:

    • subscriber.next(value) — эмитируем значение;

    • subscriber.error(err) — сообщаем об ошибке;

    • subscriber.complete() — завершаем поток.

import { Observable } from 'rxjs';

/**
 * Функция delayedRange создает Observable, который эмитирует числа
 * из указанного диапазона с заданной задержкой между значениями.
 *
 * @param start Начальное число диапазона.
 * @param end Конечное число диапазона.
 * @param delayTime Задержка между эмиссиями значений (в миллисекундах).
 * @returns Observable, который эмитирует числа из диапазона.
 */
export function delayedRange(start: number, end: number, delayTime: number): Observable<number> {
  return new Observable<number>(subscriber => {
    let current = start;
    // Эмитирует значения с заданной задержкой
    const emitValue = () => {
      if (current <= end) {
        subscriber.next(current);
        current++;
        setTimeout(emitValue, delayTime); // Эмитируем следующее значение
      } else {
        subscriber.complete(); // Завершаем поток, если достигнут конец диапазона
      }
    };
    emitValue(); // Запускаем эмиссию значений
    // Функция очистки, которая вызовется при unsubscribe
    return () => {
      console.log('Подписка завершена.');
    };
  });
}

Объяснение кода

  1. Параметры функции:

    • start: number — начальное значение диапазона.

    • end: number — конечное значение диапазона.

    • delayTime: number — задержка между эмиссиями каждого значения (в миллисекундах).

  2. Создание Observable: Мы напрямую используем new Observable, чтобы задать логику работы потока: Внутри мы описываем, как значения будут эмитироваться подписчику.

  3. Эмит значений через метод next: Значения от start до end эмитируются последовательно с задержкой. Для этого используется метод setTimeout

  4. Очистка ресурсов: Если подписчик завершит подписку до окончания эмиссии, следует освободить ресурсы (в данном случае здесь только лог выводится, но на практике вы можете сбрасывать таймеры, очищать объекты и т.п.):

   return () => {
      console.log('Подписка завершена.');
   };

Эта функция возвращается из new Observable и вызывается автоматически при вызове unsubscribe.

import { delayedRange } from './custom-operators';

delayedRange(1, 5, 1000).subscribe({
  next(value) {
    console.log('Получено значение:', value);
  },
  complete() {
    console.log('Поток завершен.');
  }
});

/*
Вывод:
Получено значение: 1
Получено значение: 2
Получено значение: 3
Получено значение: 4
Получено значение: 5
Поток завершен.
*/

Замечания по реализации

  1. Обработка ошибок: Если в процессе создания значений может произойти ошибка, используйте метод subscriber.error(err). В данном примере ошибок нет, но это важно учитывать в более сложных сценариях.

  2. Тестирование: Покройте вашу функцию unit-тестами, чтобы убедиться в корректности ее работы.

  3. Разумное управление ресурсами: В production-коде стоит продумать корректное освобождение ресурсов, особенно если вы работаете с асинхронными API (например, setTimeout, setInterval, HTTP-запросы).

Заключение

Создание собственных операторов в RxJS — это мощный способ расширить возможности реактивного программирования и адаптировать его под уникальные задачи вашего проекта. В этой статье мы рассмотрели, как создавать pipeable и creation операторы для более удобной и читаемой работы с потоками данных.

Комментарии (0)