Доброго времени суток, дорогие читатели!
В данной статье я буду рассказывать про холодную и горячую последовательность на примере такой задачи:
Пользователь заходит на сайт и начинается внутренняя прослушка (какой либо запрос на сервер с получением данных, в нашем же случае будет обычный sleepAsync — функция с таймаутом и остается только его подхватить и обработать). Режимов прослушки два — оптимизированный (будем называть горячим) и константный (будем называть холодным). При условии активности пользователя на вкладке, включаем оптимизированный метод, если же пользователь покинул вкладку, включаем константный. Также нужно показывать количество времени, которое занимала прослушка.
Зарисовка - как будет выглядеть логика итоговой программы:
Перейдем к написанию кода!
Все нужные переменные:
data() {
return {
logs: Immutable.List(), // Хорошая практика использовать immutable коллекции
working: false, // переменная, отвечающая за работу режимов
abortController: new AbortController() as AbortController, // контроллер abort сигналов
abortSignal: null as unknown as AbortSignal, // собственно сам abortSignal
lastSavedState: null as unknown as number, // состояние, которое успели сохранить
progressiveSub: null as unknown as Subscription, // горячая последовательность
constantSub: null as unknown as Subscription, // холодная последовательность
}
},
Самое простое - возьмем откуда нибудь функцию sleepAsync для имитирования запроса на БД. В моем случае я нашел вот такую функцию:
sleepAsync(_ms: number, _signal?: AbortSignal) {
return new Promise<void>((_resolve, _reject) => {
if (_signal?.aborted === true) {
_reject(new DOMException('Operation cancelled', 'AbortError'));
return;
}
let timerId: number | null = null;
let completed = false;
if (_signal) {
_signal.addEventListener('abort', _event => {
if (!completed) {
if (timerId) {
clearTimeout(timerId);
timerId = null;
}
completed = true;
_reject(new DOMException('Operation cancelled', 'AbortError'));
}
});
}
timerId = setTimeout(() => {
if (completed)
return;
timerId = null;
completed = true;
_resolve();
}, _ms);
});
}
Как видим, она даже с abortSignal'ом, поэтому будем использовать еще и его :)
К ней добавим функцию fetchAsync - та самая имитация прослушки. Функция просто возвращает случайное число и записывает его в immutable список.
fetchAsync(_abort: AbortSignal): Promise<number> {
const jitterMs: number = (Math.random() * 1000) % 5000;
const randomValue: number = Math.floor((Math.random() * 5000) % 5000)
await this.sleepAsync(2000 + jitterMs, _abort);
this.logs = this.logs.push(randomValue)
print(`[FETCH ASYNC] returning ${randomValue} and currentState is ${this.logs}`)
return randomValue;
},
Перейдем к функции, получающей это значение и обрабатывающей его:
fetchAndHandleResult(value: number): Observable<any> {
const startTime = Date.now();
return from(this.fetchAsync(this.abortSignal)).pipe(
map((result) => {
const endTime = Date.now();
const duration = endTime - startTime;
print(`[TRACING WORK] On ${value} sec completed fetch and lasted - ${duration} ms`);
return result;
}),
catchError((error) => {
let endTime = Date.now();
let duration = endTime - startTime;
print(`[TRACING WORK] CANCELLED on ${value} sec, fetch worked ${duration} ms`);
throw error;
}),
);
},
Теперь виновник торжества - функция для построения двух потоков:
tracingWork(): void {
this.$refs.doButton.disabled = true // Отключаем кнопку, чтобы не запускать сразу несколько потоков
this.working = true // работаем!
this.resetAbortController() // если до этого был сигнал abort, очистим его
const visibilityChange$ = fromEvent(document, 'visibilitychange'); // событие изменения активности вкладки
const constantMinutes = 8 // время для холодной последовательности
const constantValue = of(constantMinutes); // То же время, только в потоке
let intervalArray = [1, 2, 4]; // горячий массив :)
// Переменная, создающая поток с прогрессивным методом
const progressiveThread$ = visibilityChange$.pipe(
startWith({ target: document }), // Начальное значение видимости
switchMap((event) => {
const isVisible = document.visibilityState === 'visible'; // Проверяем на видимость
if (isVisible) {
print(`Page is visible. Working Optimized method`)
if (this.lastSavedState) {
// Если есть сохраненное значение, т.е. пользователь вышел и зашел обратно на вкладку
const startIndex = intervalArray.indexOf(this.lastSavedState)
intervalArray = intervalArray.slice(startIndex)
console.log(intervalArray)
}
return from(intervalArray).pipe(
concatMap((value: number, index: number) => {
this.lastSavedState = value;
return of(value).pipe(
delay(value * 1000),
concatMap(() => this.fetchAndHandleResult(value)),
concatMap((result) => {
if (index === intervalArray.length - 1) {
print('[PROGRESSIVE WORK] Infinite loop started');
return this.runInfiniteLoop(value);
} else {
return of(result);
}
})
);
})
);
} else {
// Если вкладка не акдивная
this.abortController.abort()
this.resetAbortController()
return EMPTY; // Пустой поток, если документ скрыт
}
}),
);
// Тут примерно то же самое, что и сверху, только поменьше
const constThread$ = visibilityChange$.pipe(
switchMap((event) => {
const isVisible = document.visibilityState === 'visible';
if (!isVisible) {
print(`Page is hidden. Working constant method`)
return interval(constantMinutes * 1000).pipe(
concatMap(() => {
return this.fetchAndHandleResult(constantMinutes);
})
);
} else {
this.abortController.abort()
this.resetAbortController()
return EMPTY; // Пустой поток, если документ активен
}
})
);
this.progressiveSub = this.createSubscription$(progressiveThread$, 'PROGRESSIVE');
this.constantSub = this.createSubscription$(constThread$, 'CONSTANT');
},
Так, пробежимся по операторам, которые я использовал и почему — switchMap
думаю самый понятный из них, т.к. этот оператор позволяет как раз‑таки менять потоки друг на друга, что нам и требуется в контексте изменений активности вкладки. Теперь наверное главный вопрос — почему везде concatMap
, а не обычный map
?
В данном коде встречается оператор concatMap
вместо map
, и разница между ними действительно важна. Оператор map
применяет указанную функцию к каждому элементу потока и возвращает новый элемент, который заменяет исходный элемент в потоке. В данном контексте это было бы недостаточно, потому что нужно выполнить некоторые асинхронные операции и сохранить последовательность выполнения.
Оператор map
не подходит для работы с асинхронными операциями, так как он просто трансформирует каждый элемент независимо от других элементов в потоке. Оператор concatMap
, с другой стороны, используется для выполнения асинхронных операций, сохраняя последовательность их выполнения. Когда используется concatMap
, каждый элемент исходного потока будет обработан последовательно, то есть следующая операция не начнется до завершения предыдущей. Если бы мы использовалиmap
вместо concatMap
, то асинхронные операции не были бы выполнены последовательно (синхронно, параллельно, непонятно — всегда по разному, процессорное время оно такое — непостоянное, где то системный файл заберет время, где то ядро выделит пространство для другого процесса и всякое в таком духе). А с помощью concatMap
, каждая операция будет выполнена в правильной последовательности, что особенно важно в данном контексте, так как предполагается выполнение некоторых действий по мере поступления элементов из intervalArray
.
Таким образом, мы используемconcatMap
, чтобы гарантировать последовательное выполнение асинхронных операций для каждого элемента потока и поддержание правильного порядка действий.
Демонстрация
Вывод
Итого, что мы имеем — два метода работы холодной и горячей последовательности, которые опрашивают сервер в определенный момент времени в зависимости от действий пользователя, в нашем случае — активности / неактивности вкладки.
Код как обычно выложил на свой гитхаб.
Надеюсь статья была для Вас полезна! Если есть пожелания / улучшения моего кода — то добро пожаловать в комментарии!) До встречи в следующей статье!
Комментарии (4)
nin-jin
30.07.2023 10:44-1Ну а примерно так это выглядело бы на $mol_wire:
class App extends $mol_object { @ $mol_mem logs() { return [] as string[] } @ $mol_action fetch() { this.$.$mol_wait_timeout( 2000 + ( Math.random() * 1000 ) % 5000 ) const randomValue = Math.floor( ( Math.random() * 5000 ) % 5000 ) console.log( `[FETCH] Result ${randomValue} and logs: ${ this.logs() }` ) this.logs().push( randomValue ) return randomValue } @ $mol_action fetch_trace() { let duration =- $mol_wire_sync( Date ).now() try { const res = this.fetch() cosole.log( `[FETCH TRACE] Done` ) return res } catch( error : unknown ) { cosole.log( `[FETCH TRACE] Fail` ) } finally { duration += Date.now() console.log( `[FETCH TRACE] Duration ${duration} ms` ) } } @ $mol_mem fresh_value() { const frequency = this.$.$mol_tab.active() ? 1000 : 8000 this.$.$mol_state_time.now( frequency ) return this.fetch_trace() } }
shai_hulud
https://benlesh.medium.com/hot-vs-cold-observables-f8094ed53339
cold -> не генерируют значения пока никто не подписан
warm -> генерируют значения пока есть хоть один подписчик
hot -> генерируют значения независимо от наличия подписчиков
----------------------------------------------
сделать cold -> warm:
share() + refCount
сделать cold -> hot:
share()
GonnaMakeItBrah Автор
Это больше теоретические термины, я же интерпретировал это в рамках своей практической задачи, да и интерпретаций этих последовательностей куча - в любом случае смысл остается примерно тот же, как ни крути). В моем понимании:
Горячий режим (оптимизированный метод): Я использую этот термин для обозначения режима прослушки, который активируется, когда пользователь активен на вкладке. В целом, "горячий" может пониматься как что-то активное и быстрое, поэтому этот термин может быть справедливым
Холодный режим (константный метод): Я использую этот термин для обозначения режима прослушки, который активируется, когда пользователь покинул вкладку. Термин "холодный" обычно ассоциируется с чем-то неактивным или медленным, как и приведено в константном методе