RxJS (Reactive Extensions for JavaScript) — мощный инструмент для работы с асинхронными потоками данных, который используется во многих современных веб-приложениях. Хотя RxJS предоставляет богатую коллекцию операторов, иногда для решения специфических задач бывает необходимо писать свои собственные. Это позволяет избежать дублирования кода и повысить читаемость программы.
Создание своих операторов RxJS может показаться сложным, особенно для тех, кто только начал использовать библиотеку. Однако, фундаментальные принципы их разработки понятны, если погрузиться в механику работы RxJS. В этой статье мы углубимся в то, как создавать собственные pipeable и creation operators, а также рассмотрим практические примеры их применения.
Что такое оператор RxJS?
Оператор RxJS — это чистая функция, которая принимает Observable как входной параметр (или создает Observable) и возвращает иной Observable, преобразованный в соответствии с заданной логикой. Операторы являются основой реактивного программирования, позволяя удобно трансформировать, фильтровать, комбинировать, управлять потоками данных и процессами асинхронной обработки.
Основные характеристики оператора RxJS
-
Чистота функции: Оператор должен быть чистой функцией. Это значит, что он не влияет на внешние состояния и любой вызов оператора с одинаковыми входными параметрами всегда должен давать одинаковый результат.
Пример: Оператор map всегда возвращает значения, применяя трансформацию, без побочных эффектов.
Неизменность входящего Observable: Оператор не изменяет исходный Observable, а создает новый — это важное свойство функционального программирования. Оригинальный поток остается неизменным.
К цепочке операторов можно добавлять новые логики: Операторы поддерживают композиционность, что означает, что один или несколько операторов могут объединяться в цепочку, используя метод pipe.
Типы операторов
Операторы делятся на два типа:
Pipeable операторы — обычные функции, которые используются в цепочке (pipe). Они принимают поток и возвращают новый поток:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log); // 2, 4, 6
Creation Functions — функции, которые создают Observable. Например, of, fromEvent:
import { fromEvent } from 'rxjs';
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe(console.log);
Как работают pipeable операторы
Любой пользовательский pipeable оператор обычно выполняет следующие задачи:
Принимает входной поток (source).
Добавляет к нему кастомную логику.
Создает и возвращает новый Observable, который подчиняется полученной логике.
Вы можете представить их как цепочку обработки данных в конвейере. Давайте разберёмся, как написать такой оператор.
Как создать pipeable оператор
Шаг 1: Определите задачу
Определите, какую задачу будет решать ваш кастомный оператор. Например, давайте реализуем оператор debounceIf, который применяет debounce к потоку только при выполнении определенного условия. Этот оператор может использоваться для оптимизации обработки событий, таких как клики мыши или ввод текста.
Шаг 2: Определение базовой структуры
Создадим функцию, которая будет являться нашим оператором:
import { Observable } from 'rxjs';
export function debounceIf<T>(condition: () => boolean, delayTime: number)
:(source$: Observable<T>) => Observable<T> {
return function (source$: Observable<T>): Observable<T> {
return new Observable<T>(subscriber => {
// Создаем кастомную подписку
});
};
}
condition — возвращает true или false, определяя, нужно ли применять debounce.
delayTime — время задержки в миллисекундах.
Шаг 3: Реализация логики
import {Observable} from 'rxjs';
export function debounceIf<T>(condition: () => boolean, duration: number)
: (source: Observable<T>) => Observable<T> {
return source => new Observable(observer => {
let timeoutId: any | null = null;
let subscription = source.subscribe({
next(value) {
if (timeoutId) clearTimeout(timeoutId) // сбрасываем таймер чтобы исключить срабатывания
if (condition()) {
// Если условие выполнено, используем debounce
timeoutId = setTimeout(() => {
observer.next(value);
console.log(timeoutId)
}, duration);
} else {
// Если нет, пропускаем значение сразу
observer.next(value);
}
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
});
return () => {
subscription.unsubscribe();
if (timeoutId) clearTimeout(timeoutId);
};
});
}
Пример использования:
import {Component, OnInit} from '@angular/core';
import {fromEvent} from "rxjs";
import {debounceIf} from "@pipes/debounceIf";
@Component({
selector: 'app-home',
templateUrl: 'home.component.html',
})
export class HomeComponent implements OnInit {
clicks: number[] = [];
debounced = false;
ngOnInit(): void {
fromEvent(document, 'click')
.pipe(debounceIf(() => this.debounced, 3000))
.subscribe(() => this.clicks.push(new Date().getTime()));
}
toggleDebounced(): void {
this.debounced = !this.debounced;
}
}
Этот код будет пропускать только последний клик по кнопке если в течение двух секунд не происходило новых нажатий при включенном переключателе.
Операторы создания (creation operators)
Сreation function в RxJS отвечает за создание нового Observable с определенной логикой. В отличие от операторов (которые принимают поток и возвращают поток), creation-функция используется там, где вам необходимо начать с чего-то нового, например: фиксированных значений, событий DOM, таймеров и т.д. Примеры встроенных creation functions в RxJS: of, from, interval, fromEvent.
Что создадим:
Предположим, мы хотим создать функцию delayedRange, которая через определенную задержку эмитирует значения из диапазона чисел. Она позволяет настроить:
начальное число диапазона;
конечное число диапазона;
задержку между эмиссиями значений.
Шаги создания кастомной creation function
Использование new Observable
Все кастомные creation functions в RxJS строятся на использовании конструктора Observable. Конструктор принимает аргументом функцию (subscribe), в которой определяется логика эмиссии значений.Поддержка управления подпиской
Необходимо предоставить механизм для очистки ресурсов (unsubscribe). Это может быть полезно, если подписчик завершает поток раньше времени. Например, нужно очищать таймеры, чтобы не было утечек памяти.-
Эмит данных с использованием next, обработка ошибок и завершение
Через объект subscriber мы вызываем соответствующие методы:subscriber.next(value) — эмитируем значение;
subscriber.error(err) — сообщаем об ошибке;
subscriber.complete() — завершаем поток.
import { Observable } from 'rxjs';
/**
* Функция delayedRange создает Observable, который эмитирует числа
* из указанного диапазона с заданной задержкой между значениями.
*
* @param start Начальное число диапазона.
* @param end Конечное число диапазона.
* @param delayTime Задержка между эмиссиями значений (в миллисекундах).
* @returns Observable, который эмитирует числа из диапазона.
*/
export function delayedRange(start: number, end: number, delayTime: number): Observable<number> {
return new Observable<number>(subscriber => {
let current = start;
// Эмитирует значения с заданной задержкой
const emitValue = () => {
if (current <= end) {
subscriber.next(current);
current++;
setTimeout(emitValue, delayTime); // Эмитируем следующее значение
} else {
subscriber.complete(); // Завершаем поток, если достигнут конец диапазона
}
};
emitValue(); // Запускаем эмиссию значений
// Функция очистки, которая вызовется при unsubscribe
return () => {
console.log('Подписка завершена.');
};
});
}
Объяснение кода
-
Параметры функции:
start: number — начальное значение диапазона.
end: number — конечное значение диапазона.
delayTime: number — задержка между эмиссиями каждого значения (в миллисекундах).
Создание Observable: Мы напрямую используем new Observable, чтобы задать логику работы потока: Внутри мы описываем, как значения будут эмитироваться подписчику.
Эмит значений через метод next: Значения от start до end эмитируются последовательно с задержкой. Для этого используется метод setTimeout
Очистка ресурсов: Если подписчик завершит подписку до окончания эмиссии, следует освободить ресурсы (в данном случае здесь только лог выводится, но на практике вы можете сбрасывать таймеры, очищать объекты и т.п.):
return () => {
console.log('Подписка завершена.');
};
Эта функция возвращается из new Observable и вызывается автоматически при вызове unsubscribe.
import { delayedRange } from './custom-operators';
delayedRange(1, 5, 1000).subscribe({
next(value) {
console.log('Получено значение:', value);
},
complete() {
console.log('Поток завершен.');
}
});
/*
Вывод:
Получено значение: 1
Получено значение: 2
Получено значение: 3
Получено значение: 4
Получено значение: 5
Поток завершен.
*/
Замечания по реализации
Обработка ошибок: Если в процессе создания значений может произойти ошибка, используйте метод subscriber.error(err). В данном примере ошибок нет, но это важно учитывать в более сложных сценариях.
Тестирование: Покройте вашу функцию unit-тестами, чтобы убедиться в корректности ее работы.
Разумное управление ресурсами: В production-коде стоит продумать корректное освобождение ресурсов, особенно если вы работаете с асинхронными API (например, setTimeout, setInterval, HTTP-запросы).
Заключение
Создание собственных операторов в RxJS — это мощный способ расширить возможности реактивного программирования и адаптировать его под уникальные задачи вашего проекта. В этой статье мы рассмотрели, как создавать pipeable и creation операторы для более удобной и читаемой работы с потоками данных.