Всем привет. На связи Омельницкий Сергей. Не так давно я вел стрим по реактивному программированию, где рассказывал про асинхронность в JavaScript. Сегодня я бы хотел законспектировать этот материал.
Но перед тем как начать основной материал нам нужно сделать вводную. Итак, давайте начнем с определений: что такое стек и очередь?
Стек — это коллекция, элементы которой получают по принципу «последний вошел, первый вышел» LIFO
Очередь — это коллекция, элементы которой получают по принципу («первый вошел, первый вышел» FIFO
Окей, продолжим.
JavaScript — это однопоточный язык программирования. Это значит, что он имеется только один поток выполнения и один стек, в который помещаются функции в очередь на выполнение. Следовательно в один момент времени JavaScript может выполнить только одну операцию, другие операции при этом будут ждать своей очереди в стеке, пока их не вызовут.
Стек вызовов — это структура данных, которая, упрощенно говоря, записывает сведения о месте в программе, где мы находимся. Если мы переходим в функцию, мы помещаем запись о ней в верхнюю часть стека. Когда мы из функции возвращаемся, мы вытаскиваем из стека самый верхний элемент и оказываемся там, откуда вызывали эту функцию. Это — всё, что умеет стек. А теперь крайне интересный вопрос. Как тогда работает асинхронность в JavasScript?
На самом деле помимо стека в браузерах присутствует особая очередь для работы с так называемым WebAPI. Функции из этой очереди выполнятся по порядку только после того, как стек будет полностью очищен. Только после этого они помещаются из очереди в стек на выполнение. Если в стеке в данный момент находится хотя бы один элемент, то они в стек попасть не могут. Как раз именно из-за этого вызов функций по таймаута часто бывает не точным по времени, так как функция не может попасть из очереди в стек, пока он заполнен.
Рассмотрим следующий пример и займёмся его пошаговым «выполнением». Также посмотрим, что при этом происходит в системе.
console.log('Hi');
setTimeout(function cb1() {
console.log('cb1');
}, 5000);
console.log('Bye');
1) Пока ничего не происходит. Консоль браузера чиста, стек вызовов пуст.
2) Потом команда console.log('Hi') добавляется в стек вызовов.
3) И она выполняется
4) Затем console.log('Hi') удаляется из стека вызовов.
5) Теперь переходим к команде setTimeout(function cb1() {… }). Она добавляется в стек вызовов.
6) Команда setTimeout(function cb1() {… }) выполняется. Браузер создаёт таймер, являющийся частью Web API. Он будет выполнять обратный отсчёт времени.
7) Команда setTimeout(function cb1() {… }) завершила работу и удаляется из стека вызовов.
8) Команда console.log('Bye') добавляется в стек вызовов.
9) Команда console.log('Bye') выполняется.
10) Команда console.log('Bye') удаляется из стека вызовов.
11) После того, как пройдут, как минимум, 5000 мс., таймер завершает работу и помещает коллбэк cb1 в очередь коллбэков.
12) Цикл событий берёт c функцию cb1 из очереди коллбэков и помещает её в стек вызовов.
13) Функция cb1 выполняется и добавляет console.log('cb1') в стек вызовов.
14) Команда console.log('cb1') выполняется.
15) Команда console.log('cb1') удаляется из стека вызовов.
16) Функция cb1 удаляется из стека вызовов.
Взглянем на пример в динамике:
Ну вот мы и рассмотрели как в JavaScript реализована асинхронность. Теперь давайте кратко поговорим об эволюции асинхронного кода.
Эволюция асинхронного кода.
a(function (resultsFromA) {
b(resultsFromA, function (resultsFromB) {
c(resultsFromB, function (resultsFromC) {
d(resultsFromC, function (resultsFromD) {
e(resultsFromD, function (resultsFromE) {
f(resultsFromE, function (resultsFromF) {
console.log(resultsFromF);
})
})
})
})
})
});
Асинхронное программирование, каким мы его знаем в JavaScript, может быть реализовано только функциями. Они могут быть переданы как любая другая переменная другим функциям. Так родились коллбэки. И это прикольно, весело и задорно, пока не превращается в грусть, тоску и печаль. Почему? Да все просто:
- С ростом сложности кода, проект быстро превращается в малопонятные многократно вложенные блоки — «callback hell».
- Обработку ошибок можно легко упустить.
- Нельзя возвращать выражения с return.
С появлением Promise обстановка стала чуть лучше.
new Promise(function(resolve, reject) {
setTimeout(() => resolve(1), 2000);
}).then((result) => {
alert(result);
return result + 2;
}).then((result) => {
throw new Error('FAILED HERE');
alert(result);
return result + 2;
}).then((result) => {
alert(result);
return result + 2;
}).catch((e) => {
console.log('error: ', e);
});
- Появились цепочки промисов, что улучшило читаемость кода
- Появился отдельный метод перехвата ошибок
- Появилась возможность параллельного выполнения с помощью Promise.all
- Вложенную асинхронность мы можем решить с помощью async/await
Но у промиса есть свои ограничения. К примеру промис, без танцев с бубном, нельзя отменить, а что самое главное — работает с одним значением.
Ну вот мы и плавно подошли к реактивному программированию. Устали? Ну благо дело можно можно пойти заварить чаек, обмозговать и вернуться читать далее. А я продолжу.
Реактивное программирование?—?парадигма программирования, ориентированная на потоки данных и распространение изменений. Давайте более детально разберем что такое поток данных.
// Получаем ссылку на элемент
const input = ducument.querySelector('input');
const eventsArray = [];
// Пушим каждое событие в массив eventsArray
input.addEventListener('keyup',
event => eventsArray.push(event)
);
Представим, что у нас есть поле ввода. Мы создаем массив, и на каждый keyup события input мы будем сохранять событие в нашем массиве. При этом хотелось бы отметить, что наш массив отсортирован по времени? т.е. ?индекс более поздних событий больше, чем индекс более ранних. Такой массив представляет собой упрощенную модель потока данных, но это еще не поток. Для того чтоб этот массив можно было смело назвать потоком он должен уметь каким-то образом сообщать подписчикам, что в него поступили новые данные. Таким образом мы подошли к определению потока.
Поток данных
const { interval } = Rx;
const { take } = RxOperators;
interval(1000).pipe(
take(4)
)
Поток?—?это массив данных, отсортированных по времени, который может сообщать о том, что данные изменились. А теперь представьте как удобно становится писать код, в котором на одно действие потребуется вызывать несколько событий в разных участках кода. Мы просто делаем подписку на поток и он нам сам сообщит когда произойдут изменения. И это умеет делать библиотека RxJs.
RxJS — это библиотека для работы с асинхронными и основанными на событиях программами с использованием наблюдаемых последовательностей. Библиотека предоставляет основной тип Observable, несколько вспомогательных типов (Observer, Schedulers, Subjects) и операторы работы с событиями как с коллекциями (map, filter, reduce, every и подобные из JavaScript Array).
Давайте разберемся с основными понятиями этой библиотеки.
Observable, Observer, Producer
Observable — первый базовый тип, который мы рассмотрим. Этот класс содержит в себе основную часть реализации RxJs. Он связан с наблюдаемым потоком, на который можно как подписаться с помощью метода subscribe.
В Observable реализуется вспомогательный механизм для создания обновлений, так называемый Observer. Источником значений для Observer называется Producer. Это может быть массив, итератор, web socket, какое-то событие и т.п. Так что можно сказать, что observable является проводником между Producer и Observer.
Observable обрабатывает три вида событий Observer:
- next – новые данные
- error – ошибку, если последовательность завершилась по причине исключительной ситуации. это событие так же предполагает завершение последовательности.
- complete — сигнал о завершении последовательности. Это означает, что новых данных больше не будет
Посмотрим демо:
В начале мы обработаем значения 1, 2, 3, а спустя 1 сек. мы получим 4 и завершим наш поток.
И тут я понял, что рассказывать было интересней чем писать об этом. :D
Subscription
Когда мы делаем подписку на поток, мы создаем новый класс subscription, который дает нам возможность отменить подписку с помощью метода unsubscribe. Так же мы можем сгруппировать подписки с помощью метода add. Ну и логично, что мы можем разгруппировать потоки с помощью remove. Методы add и remove на вход принимают другую подписку. Хотелось бы отметить, что когда мы делаем отписку, то мы отписываемся от всех дочерних подписок как будто бы и у них вызывали метод unsubscribe. Идем дальше.
Виды потоков
HOT | COLD |
---|---|
Producer создается снаружи observable | Producer создается внутри observable |
Данные передаются в момент создания observable | Данные сообщаются в момент подписки |
Нужна дополнительная логика для отписки | Поток завершается самостоятельно |
Использует связь один-к-многим | Использует связь вида один-к-одному |
Все подписки имеют единое значение | Подписки независимы |
Данные можно потерять, если нет подписки | Переиздает все значения потока для новой подписки |
Если приводить аналогию, то я бы представил горячий поток как фильм в кинотеатре. В какой момент времени ты пришел, с того момента и начал просмотр. Холодный поток я бы сравнил со звонком в тех. поддержку. Любой позвонивший слушает запись автоответчика от начала до конца, но ты можешь бросить трубку с помощью unsubscribe.
Хотелось бы отметить, что существуют еще так называемые warm потоки ( такое определение я встречал крайне редко и только на зарубежных сообществах ) — это поток, который трансформируется из холодного потока в горячий. Возникает вопрос — где использовать)) Приведу пример из практики.
Я работаю с ангуляром. Он активно использует rxjs. Для получения данных на сервер я ожидаю холодный поток и этот поток использую в шаблоне с помощью asyncPipe. Если я использую этот пайп несколько раз, то, возвращаясь к определению холодного потока, каждый pipe будет запрашивать данные с сервера, что мягко говоря странно. А если я преобразую холодный поток в теплый, то запрос произойдет единожды.
Вообще понимание вида потоков достаточно сложна для начинающих, но важна.
Operators
return this.http.get(`${environment.apiUrl}/${this.apiUrl}/trade_companies`)
.pipe(
tap(({ data }: TradeCompanyList) => this.companies$$.next(cloneDeep(data))),
map(({ data }: TradeCompanyList) => data)
);
Расширить возможность работы с потоками нам предоставляют операторы. Они помогают контролировать события, протекающие в Observable. Мы рассмотрим парочку наиболее популярных, а подробнее с операторами можно ознакомиться по ссылкам в полезной информации.
Operators — of
Начнем со вспомогательного оператора of. Он создает Observable на основе простого значения.
Operators — filter
Оператор фильтрации filter, как можно понять по названию, фильтрует сигнал потока. Если оператор возвращает истину, то пропускает далее.
Operators — take
take — Принимает значение кол-ва эмитов, после которого завершает поток.
Operators — debounceTime
debounceTime — отбрасывает испускаемые значения, которые попадают в указанный промежуток времени между выходными данными — по прошествии временного интервала эмитит последнее значение.
const { Observable } = Rx;
const { debounceTime, take } = RxOperators;
Observable.create((observer) => {
let i = 1;
observer.next(i++);
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next(i++)
}, 1000);
// Испускаем значение раз в 1500мс
setInterval(() => {
observer.next(i++)
}, 1500);
}).pipe(
debounceTime(700), // Ожидаем 700мс значения прежде чем обработать
take(3)
);
Operators — takeWhile
Эмитит значения, пока takeWhile не вернет false, после чего отпишется от потока.
const { Observable } = Rx;
const { debounceTime, takeWhile } = RxOperators;
Observable.create((observer) => {
let i = 1;
observer.next(i++);
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next(i++)
}, 1000);
}).pipe(
takeWhile( producer => producer < 5 )
);
Operators — combineLatest
Комбинированный оператор combineLatest чем-то похож на promise.all. Он объединяет несколько потоков в один. После того как каждый поток сделает хотя бы один эмит, мы получаем последние значения от каждого в виде массива. Далее, после любого эмита из объединённых потоков он будет отдавать новые значения.
const { combineLatest, Observable } = Rx;
const { take } = RxOperators;
const observer_1 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next('a: ' + i++);
}, 1000);
});
const observer_2 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 750мс
setInterval(() => {
observer.next('b: ' + i++);
}, 750);
});
combineLatest(observer_1, observer_2).pipe(take(5));
Operators — zip
Zip — ждет значение из каждого потока и формирует массив на основе этих значений. Если значение не придет из какого-либо потока, то группа не будет сформирована.
const { zip, Observable } = Rx;
const { take } = RxOperators;
const observer_1 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next('a: ' + i++);
}, 1000);
});
const observer_2 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 750
setInterval(() => {
observer.next('b: ' + i++);
}, 750);
});
const observer_3 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 500
setInterval(() => {
observer.next('c: ' + i++);
}, 500);
});
zip(observer_1, observer_2, observer_3).pipe(take(5));
Operators — forkJoin
forkJoin также объединяет потоки, но он емитнит значение только когда все потоки будут завершены (complete).
const { forkJoin, Observable } = Rx;
const { take } = RxOperators;
const observer_1 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next('a: ' + i++);
}, 1000);
}).pipe(take(3));
const observer_2 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 750
setInterval(() => {
observer.next('b: ' + i++);
}, 750);
}).pipe(take(5));
const observer_3 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 500
setInterval(() => {
observer.next('c: ' + i++);
}, 500);
}).pipe(take(4));
forkJoin(observer_1, observer_2, observer_3);
Operators — map
Оператор трансформации map преобразует значение эмита в новое.
const { Observable } = Rx;
const { take, map } = RxOperators;
Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next(i++);
}, 1000);
}).pipe(
map(x => x * 10),
take(3)
);
Operators – share, tap
Оператор tap - позволяет делать side эффекты, то есть какие-либо действия, которые не влияют на последовательность.
Утилитный оператор share способен из холодного потока сделать горячим.
С операторами закончили. Перейдем к Subject.
И тут я пошел чаек пить. Утомили меня эти примеры :D
Семейство subject-ов
Семейство subject-ов являются ярким примером горячих потоков. Эти классы являются неким гибридом, которые выступают одновременно в роли observable и observer. Так как subject является горячим потоком, то от него необходимо отписываться. Если говорить по основным методам, то это:
- next – передача новых данных в поток
- error – ошибка и завершение потока
- complete – завершение потока
- subscribe – подписаться на поток
- unsubscribe – отписаться от потока
- asObservable – трансформируем в наблюдателя
- toPromise – трансформирует в промис
Выделяют 4 5 типов subject-ов.
На стриме говорил 4, а оказалось они еще один добавили. Как говорится век живи век учись.
Простой Subject new Subject()
– самый простой вид subject-ов. Создается без параметров. Передает значения пришедшие только после подписки.
BehaviorSubject new BehaviorSubject( defaultData<T> )
– на мой взгляд самый распространённый вид subject-ов. На вход принимает значение по умолчанию. Всегда сохраняет данные последнего эмита, которые передает при подписке. Данный класс имеет так же полезный метод value, который возвращает текущее значение потока.
ReplaySubject new ReplaySubject(bufferSize?: number, windowTime?: number)
— На вход опционально может принять первым аргументом размер буфера значений, которые он будет в себе хранить, а вторым время в течении которого нам нужны изменения.
AsyncSubject new AsyncSubject()
— при подписке ничего не происходит, и значение будет возвращено только при complete. Будет возвращено только последнее значение потока.
WebSocketSubject new WebSocketSubject(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>)
— О нем документация молчит и я сам его в первый раз вижу. Кто знает что он делает пишите, дополним.
Фуф. Ну вот мы и рассмотрели все, что я хотел сегодня рассказать. Надеюсь данная информация была полезной. Самостоятельно ознакомиться со списком литературы можно во вкладке полезная информация.
Полезная информация
- Ссылка на стрим
- Как работает JS: обзор движка, механизмов времени выполнения, стека вызовов
- Как работает JS: цикл событий, асинхронность и пять способов улучшения кода с помощью async / await
- Как работает Event Loop в JavaSript
- Эволюция асинхронного JavaScript
- Что такое RxJS и почему о нём полезно знать
- Практическое применение RxJS
- RxJS Observables Tutorial — Creating & Subscribing to Observables
- RXJS: Hot and Cold Observables
- Классы, функции для создания Observable. Операторы.
- RxJS Operators By Example
- API List
- Разновидности Subject и Расписания в RxJS
Комментарии (50)
alexs0ff
03.08.2019 19:35+1В знакомство с реактивным программированием, на мой взгляд, нужно добавлять сравнение Pull и Push моделей получения данных.
DmitryKoterov
03.08.2019 22:34+31. Есть какие-то best practices по скрещиванию RxJS-подхода с async/await-подходом? Например, «у меня есть observable, а нужен async generator» или «у меня есть async generator, нужен observable, а потом обратно async generator» — и все на TypeScript. Вот на эту тему статья не помешала бы.
2. Каков глубинный смысл в том, что http.get — observable, если он возвращает ровно одно значение? Чем здесь концептуально промис плох, который можно await?
Вопросы выше скорее в контексте серверной разработки, чем клиентской, кстати.denis-isaev
04.08.2019 02:00+12) Просто для унификации интерфейса, чтобы без лишнего кода можно было сделать что-нибудь вида
race(http.get(proxy1), http.get(proxy2)).map(...convert response...).subscribe(...)
race — передаст в map первый пришедший ответ (от быстрейшего прокси).
elepner
04.08.2019 11:55+1Каков глубинный смысл в том, что http.get — observable, если он возвращает ровно одно значение?
Смыслов тут несколько. Во-первых, с fetch api можно получать и прогресс запроса, т.е. ваше утверждение не совсем верно. Во-вторых, и на мой взгляд самое главное, Promise нельзя отменить, а от Observable можно отписаться, что ведет к отмене запроса.
lehkap
04.08.2019 12:19В RxJs есть оператор fromPromise для превращения Promise в Observable. Так же у observable есть метод toPromise делает обратно из observable promise.
Когда начинал во второй Angular, подумал нафиг observable, в каждом методе вызывал .toPromise() в конце. Потом понял что ошибся, теперь возвращаю Observable, при необходимости вешаю операторы или скрещиваю несколько Observable и далее
await myFinalObservable.toPromise().
Тогда и callbackHell не возникает и всю мощь и лаконичность RxJs можно использоватьunma
04.08.2019 13:17Тоже начинал с toPromise))) думал зачем это все. Теперь честно делаю subscribe, но не из-за функционала больше, а потому что уж лучше что-то одно выбрать, везде использовать и не грузить зря мозг. Просто человеческий фактор.
DmitryKoterov
05.08.2019 09:52Ага, интересно, давайте поговорим про toPromise(). Например, у меня есть observable, который выдает 1, 2, 3 и потом заканчивается. Если я делаю:
await observable.subscribe(val => process(val)).toPromise();
// кажется, не так — но как, кстати?
то тогда process() вызовется по разу для 1, 2, 3, а вызывающий поток управления будет await-ать окончания этого дела и потом продолжится. Великолепно.
Внимание — вопрос. Что, если process() — сама по себе async-функция, да еще довольно запутанная, и внутри нее может произойти исключение? Что в этом случае произойдет?
Ведь unhandled exception же скорее всего, нет?
Как через observable «замкнуть» все эти исключения на основном потоке управления, чтобы он не расползся, и чтобы гарантированно все исключения в зависимом коде не вышли за скоуп этого основного потока управления? Вот в чем вопрос.
(Под «скоупом основного потока управления» я понимаю такое свойство асинхронного куска кода, что его можно обернуть в try-catch, и из него ничего не «вылетит» неожиданного. Ведь это одно из самых полезных свойств async-await кода.)vintage
05.08.2019 10:13// кажется, не так — но как, кстати?
await observable.pipe(map(val => process(val))).toPromise()
то тогда process() вызовется по разу для 1, 2, 3, а вызывающий поток управления будет await-ать окончания этого дела и потом продолжится.
Нет, промис зарезолвится последним значением — 3. И то, если стрим будет закрыт. Пока стрим не закроется промис не зарезолвится.
Что в этом случае произойдет?
Стрим закроектся, а промис зареджектится. Закрытие стримов при ошибках и невозможность их переоткрытия — та ещё боль. Приходится создавать дерево стримов заново.
Как через observable «замкнуть» все эти исключения на основном потоке управления
Да так же как с промисами — оператор catch. Логически промисы — частный случай обсерваблов.
DmitryKoterov
05.08.2019 10:40+1Стрим закроектся, а промис зареджектится.
Точно? Напоминаю, process — async-функция. Если в map() передается коллбэк, возвращающий промис, то разве observable будет дожидаться резолва этого промиса перед тем, как выплевывать следующее значение?
В идеале что бы я хотел сделать, это сконвертить observable не в промис, а в async generator. Чтобы можно было написать типа такого:
try { for await (const v of observable) { await process(v); } } catch (e) { ... }
mayorovp
05.08.2019 10:52В асинхронном случае надо вот так делать:
await observable.concatMap(val => from(process(val))).toPromise();
Также можно использовать mergeMap если порядок обработки не важен, а параллельность — напротив, как раз важна.
DmitryKoterov
05.08.2019 11:14Вот же ж адище. Но спасибо, логика понятна!
И последнее еще — предложенный вами вариант с mergeMap (т.е. параллельно), но так, чтобы (три разных независимых варианта):
А) одновременно выполнялось не более N функций process() а остальные ждали своей очереди
Б) все разбивалось бы на чанки по N элементов, для каждого чанка process() выполнялись бы параллельно, а сами чанки — последовательно;
В) все развивалось бы на чанки, но не фиксированного размера, а до тех пор, пока не выполнится некоторое условие насчет элементов текущего чанка (такой write barrier как бы).
Как?mayorovp
05.08.2019 11:33Пункт А — это штатная функциональность mergeMap.
Пункты Б и В — это уже вам нужен buffer и его вариации:
await observable.pipe( bufferCount(10), concatMap(buf => from(Promise.all(buf.map(process)))), concatAll() ).toPromise()
funca
05.08.2019 13:26Для единообразия есть IxJS от той же команды, что и RxJS. Но толку мало, т.к. попытки совместить оба подхода, например в реплизации backpreasure, превращаются в научную проблему. Все лучше там, где дуплексное взаимодействие заложено изначально. Например streams из nodejs или callbaGs.
Sirion
03.08.2019 23:27Странно, почему здесь до сих пор нет комментариев товарища vintage
vintage
04.08.2019 00:12+2Жду особого приглашения. Впрочем, а что тут можно сказать? Могу разве что предложить заинтересовавшихся темой, почитать эти две статьи:
https://habr.com/ru/post/307288/ — про асинхронное
https://habr.com/ru/post/317360/ — про реактивное
Paxest
03.08.2019 23:36Первая часть повторяет это видео. www.youtube.com/watch?v=8cV4ZvHXQL4
Очень понятно и доступно
LeusMaximus
04.08.2019 02:50Еще пара ссылок к полезной информации:
- пример самописного Observable — видео
- раздел по RxJS в скринкасте по Angular на learn.javascript.ru
JustDont
04.08.2019 05:21+2Вообще конечно не очень правильно говорить про реактивное программирование, и тут же уходить на RxJS. RxJS реактивен, но RxJS не единственен, и абстракции RxJS вовсе не о «простой» реактивности а-ля реакция на изменения, а о куда более конкретной реактивности (потоки событий или же push-коллекции). Архитектура RxJS чрезмерно конкретизирована — это реактивность в специфической обертке.
PS: А второй большой камень в огород RxJS — это вот эти самые магические методы на тему «как сделать с потоком то, что мне нужно». Без документации это абсолютно неподъемно, надо, наверное, писать конкретно на RxJS 24/7, чтоб помнить хотя бы треть методов. Я отлично помню времена, когда команде RxJS вздумалось переделать документацию, и какое-то время всё просто лежало, а потом еще какое-то время не было адекватного поиска. И разработка на эти дни просто встала.unma
04.08.2019 08:26+1О да, перевели внезапно на пайпы, а документация настолько убогая была, что приходилось в исходник лезть.
Sergamers Автор
04.08.2019 12:25Я бы хотел рассказать предысторию появления этого доклада. У меня есть группа, которую я активно веду. Цель у меня прийти от базовых вещей к разработке spa на ангуляре. Поэтому я рассказываю про реактивность и делаю уклон в сторону rxjs т.к. ангуляр его активно использует. Поэтому на стриме и здесь я рассказал о том, что мы будем в дальнейшем изучать.
DmitryKoterov
05.08.2019 11:36Еще один момент, по поводу того, что промисы нельзя отменять/прерывать, а observable можно.
Тут правильнее было бы, наверное, сравнивать observable не с промисами, а с async generator-ами. И вот их «отменять», кажется, можно — самой семантикой языка (промисы-то что отменять, они либо случились, либо еще нет):
async function something() { ... } async function* getStream() { for (let i = 0; i < 10; i++) { const v = await something(i); // или еще что-то, не важно yield v; } } for await (const v of getStream()) { if (v == “some”) break; ... }
Так вот, если цикл выскочит на break, работа асинк-генератора getStream() тоже закончится. И потом сборщик мусора прибьет оставшиеся от него кишки. Отмена? Отмена.
Я, правда, не до конца понимаю, почему это работает. Наверное, потому что после break-а “for await” перестает «впрыгивать» обратно в генератор, поэтому он подвисает в воздухе (раз в него никто не впрыгивает), а потом сам генератор выходит из области видимости, и сборщик мусора вычищает все зависимые вещи прямо в середине того for-а внутри getStream().mayorovp
05.08.2019 11:42Этот способ позволяет прерывать генератор только в yield-позициях, а await-позиции куда важнее.
MaZaAa
05.08.2019 12:52-1Не забивай голову ерундой, не надо ничего отменять, просто проверяешь после асинхронных вызовов, актуальны ли они ещё и если нет, то просто ничего не делаешь, а если да, то уже дальше делаешь манипуляции с данными.
Вот тут посмотри накидал на коленке банальных примеров stackblitz.com/edit/rxjs-hnlgh2mayorovp
05.08.2019 12:54+1не надо ничего отменять
Ну да, и DDOS-атака на свой же бакенд своим же штатным фронтэндом — это совершенно нормально, не надо тут ничего менять!
MaZaAa
05.08.2019 13:13-1Если это вы называете DDOS-атакой, то у меня плохие новости для ваших бэкендов. Есть такое понятие как здравый смысл, который гласит — не надо ради мелочи которая на самом деле ни на что не влияет превращать код в убожество. А если ваш бэкенд можно положить отправив на него ещё один лишний запрос, то извините, любой школьник его положит за секунды причем прямо из консоли браузера и хана вашему проекту/бизнесу.
mayorovp
05.08.2019 13:21Не "один лишний запрос", а "сотню лишних запросов с каждой открытой страницы".
Так-то понятно, что на один лишний запрос внимания можно не обращать...
MaZaAa
05.08.2019 14:03Для таких кейсов тоже можно достаточно легко обойтись обвязкой для запросов к АПИ и вспомогательногьного класса который будет проверять актуальны ли ещё запросы или нет, и если нет, то просто их не выполнять и выбросить специальное исключение, а те что в pending отменить. И никаких RxJs не надо тут даже близко. Просто пишешь как обычно, понятный линейный код сверху вниз и при этом эти нюансы будут учтены уже в этой обвязке и хэлпер классе
mayorovp
05.08.2019 14:07То, что rx.js не обязательно нужна — это да, но начинали-то вы с того, что отмена запросов не нужна.
MaZaAa
05.08.2019 14:14-1Она и не нужна в 99.9% случаев и проектов, но если у вас условно каждый клик генерирует по 100 запросов, то это уже индивидуальный случай и тут это актуально и опять же достаточно легко реализуется нативными средствами, чтобы код оставался чистым и красивым, а не лапшой. Но это актуально именно для единичных случаев, для подавляющего большинства отмена запросов не нужна и не актуальна. Но если заняться больше нечем, то можно и имплементировать, но так, чтобы код из-за это не страдал
alexs0ff
05.08.2019 14:21-1понятный линейный код
декларативный код (к которому относится RxJS) на порядок легче поддерживать, чем императивный.mayorovp
05.08.2019 14:28Так себе декларативный. В сложных случаях нужно постоянно держать в уме какой оператор сколько подписок и в какие моменты держит...
vintage
05.08.2019 15:09+1декларативный код (к которому относится RxJS)
Не относится он ни к декларативному, ни к функциональному. Это скорее железнодорожное программирование.
на порядок легче поддерживать
Ну конечно. Любая не описанная в документации задача превращается в railroad tycoon. А описанная — в копипасту без глубокого понимания, что происходит.
unma
А мне обычно достаточно промисов, а Rx считаю излишним усложнением. Ок, нельзя отменять, но зачем? Чаще всего результат идёт в локальный стейт компонента, ну и пусть пишет сколько хочет. Дебоунс нужен, но он просто вешается на внешнюю функцию/метод и все. А остальные Rx операторы если более менее серьезно использовать превращаются в кашу, которую ещё надо постараться отдалить. Не против Rx в целом, но против его пиара как универсального средства на замену промисам. Потому что ангуляр решил выпендриться. Почему observable? Давайте сразу каналы и возможность напихивать с двух сторон, но кому это надо?
На мой взгляд лучше взять простейшие промисы, async/await и допилить где надо, чем сразу брать Rx, потому что он как бы лишён недостатков промисов. Не надо забывать что главная проблема разработки — это сложность ПО, а ее недостаток фич. Ну я про свой CRUD все конечно, может у кого проблемы поглобальнее.
Sergamers Автор
Спасибо за ваш комментарий. Где ж он был когда я стирм вел, а то там все такие стесняшки были)
Согласен, что для простых проектов, когда у вас обычный запрос/ответ Rx избыточен. Да и технологии выбираются из расчета задач и компетенции команды.
А в целом RxJs хорош. Подписку не сделал — поток не работает, память не кушает; для работы с данными можно использовать единый интерфейс будь то клиентское событие или web-socket; связь один-к-многим вообще огонь, когда ты создаешь свой горячий поток.
elepner
Как же мне понятны ваши аргументы и как же вы неправы :) Был на вашем месте пару лет назад, когда начинал работать с Ангуляром. Считал observable каким-то выпендрежем без цели. Сейчас мне больно смотреть на спагетти с async/await.
Потому что выбор правильных абстракций — это половина успеха, а весь наш мир — это один большой поток событий, который движется только вперед (кстати, поэтому двусторонний канал уже излишество) и именно поэтому CQRS шагает по планете вместе с функциональным программированием.
Чтобы прочувствовать rxjs попробуйте написать свой triple/quad click в качестве упражнения классически на таймаутах, а потом с rxjs. Я даю такое задание своим джуниорам. Тут можете подсмотреть ответ. У меня получилось уложиться в 5 строчек.
А чтобы его полюбить, напишите свой хороший typeahead с отменой HTTP запросов на промисах и с rxjs, почувствуйте разницу.
unma
Не уверен что вы поняли мой посыл. Абстракция на то и абстракция, что берет только необходимое из реального мира. И в моем случае, также как мне кажется в большинстве, как минимум на данный момент то что я вижу на рынке и скорее всего останется — промисов вполне достаточно. А значит и не нужно пенеусложнять, придумывая себе проблемы на пустом месте. Я это про себя, а не про вас. Возможно у вас такие задачи составляют основную часть, но у меня — это такой минимум, что мне проще завернуть один хелпер, чем тащить Rx, потому что это однозначно усложнение, лишние абстракции, лишний вес, и пока нет нативной поддержки. Так что нет абсолют правильных абстракций, есть уместные. Таймауты прекрасно оборачиваются в промисы и я не понял о каком спагетти идёт речь, прекрасный линейный код на async/await.
elepner
Я вам дал несколько простых пирмеров, где промисов уже недостаточно. Неужели вы никогда не писали свой typeahead? Или, например, 3 зависимых селекта Страна -> Регион -> Город. На каждом шаге нужно сделать запрос, чтобы загрузить список регионов и городов. Если меняем страну нужно сбросить регион и город. Поверх этого неплохо было бы еще навернуть нормальный retry policy. При ошибке запроса делаем еще один, потом через 500мс, потом 2000мс, потом кидаем ошибку. Хотел бы я посмотреть, как это смотрится без реактивности.
unma
Выберите один и разберём если хотите. С вас реализация и с меня. Хотя скорее всего это уже миллион раз было написано, может проще поискать? То что промисы не умеют это из коробки, не значит что нельзя вообще. Правильно? Вопрос в сложности реализации. Ок, можем померить. Только как будем оценивать?
Что касается retry policy ну такое себе. Даже если оно и есть, то уж наверно в одном месте завернуто и по большому счету какая разница как оно реализовано.
Мне кажется вы упорно пытаетесь игнорировать мой аргумент, что всему свое место. Допустим даже все ваши примеры будут в реальном приложении. Это ещё совсем не говорит о том, что теперь нужен rx. В этих случаях возможно красиво, но как насчёт всего остального? Концепция промисов проста и линейна, в ней проще разобраться, тогда как в Rx есть миллион операторов, которые ещё надо найти/запомнить. В целом то код усложнится. Вопрос ради чего? Ради красоты в трёх местах? Ок, у вас таких мест много — берите Rx, я бы и сам взял. Но это далеко не мейнстрим как мне кажется. И да, потом ещё отлаживать этот винегрет.
alexs0ff
Если есть что-то «из коробки», то тогда лучше его выбрать, чем городить очередной свой велосипед, с которым другим людям нужно будет разбираться. Когда приходишь на новый проект и видишь применение стандартных механизмов предназначенных именно для решения конкретных проблем, как-то лучше работается, чем очередной раз смотреть «на полет мысли предыдущего автора».
MaZaAa
Это элементарно сделать, но боюсь представить какое это убожество будет с Rx
MaZaAa
stackblitz.com/edit/rxjs-hnlgh2
Всё элементарно и просто, не надо никакой лапши из RxJs
vintage
В случае async/await поток исполнения линеен, так что он не является спагетти по определению. В отличие от нагромождения замыканий в случае rxjs.
Это, мягко выражаясь, не так. Всё, что есть в нашем мире — это его состояние. Событие — весьма искусственная абстракция. Событиями мы называем изменения тех или иных состояний в определённый момент времени.
Вы получили документ по ссылке
http://example.org/foo/bar
. По какой ссылке нужно его сохранять, чтобы не нарваться на двусторонний канал?http://example.org/foo/bar?write
— отдельный канал для записи ресурса, но роутер направит запрос на один и тот же обработчик, который получается будет двусторонним каналом.http://example.org/write?foo/bar
— разные обработчики, но сервер-то один. Опять двусторонний канал.http://write.example.org/foo/bar
— разные сервера, но протокол-то единый, выступающий как двусторонний канал в интернет.http-write://example.org/foo/bar
— разные протоколы, но через одну точку доступа, которая выступает двусторонним каналом связи с сетью.Значит должны быть различные сетевые подключения для чтения и записи данных. Но вы используете один комп для ввода-вывода. Значит у вас должен быть отдельно монитор, показывающий файл из интернета, подключённый к одной сети. И клавиатура с мышью, изменяющие что-то в интернете, подключённые к другой сети. Но вы используете одно и то же тело для передачи и получения информации.
Это всё к вопросу о "правильных" абстракциях, соответствующих "всему нашему миру".
К сожалению, вы ни нативный код написать нормально не смогли, ни, что забавно, RXJS. Я поправил: https://stackblitz.com/edit/rxjs-bxmh4n?file=index.ts
Обратите внимание на
share
, чтобы не было 100500 подписок в доме. На копипасту, чтобы повесить разные обработчики на разное число кликов. На потерю и восстановление объекта события. На обработку дефолтной ветки логики. И, конечно, на то, что кода на RxJS получилось в полтора раза больше. Причём куда более сложного, чем нативный.Вы лучше сами попробуйте на правильной абстракции и почувствуйте разницу:
elepner
В перемере, который я выслал, надо было оставить комментарии, конечно. Он был для внутреннего пользования.
Кода на rxjs получилось 6 строчек: с 55 линии по 61. Все что выше, это описание naive подхода, чтобы можно было показать ход мыслей от простого подхода к продвинотому.
Не понял, что вы имели в виду. Ресурс по этой ссылке изменяется через PUT/PATCH на example.org/foo/bar. При чем тут вообще каналы?
Про спагетти я неверно выразился. Имелось в виду неявное состояние, которое является бичем любой системы и async/await этому активно способствует.
timeout2x
Зачем вы сравниваете ваниллу яваскрипт с rxjs?
С тем же lodash можно вполне уложиться в 3 строчки: stackblitz.com/edit/rxjs-pm9jc8?file=index.ts
Хотя пример и неплох в качестве джуниор-фильтра, к статье он подходит плохо в качестве иллюстрации.
elepner
Ваш пример не работает, если я сделаю 4 клика, то он сработает как тройной. И у вас опять же присутсвует неявное состояние count.
timeout2x
Не учел таких деталей, да собственно и не вдумывался особо.
Ну тогда проверку перенести в `_.debounce` функцию чтобы по окончанию она смотрела сколько кликов было. Тогда можно будет посчитать и сравнить.
Переменная `count` инициализирована выше. Заверните в closure если вы не хотите ее выдавать наружу, не вижу проблемы.
Опять же, это просто иллюстрация примера натягивания совы на глобус. В данном примере rxjs ну не нужен от слова совсем. Да, его можно притянуть если вся остальная разработка на rxjs. В иных случая ни потоки, ни промисы для решения этой задачи не нужны.
Таким же образом решаются задачи с typeahead — посмотрите их реализацию в куче других библиотек. Да, rxjs тут более оправдан (чем подсчет кликов по кнопке). Но опять же, в большинстве случаев просто берется готовая библиотека. Если у вас есть сильное желание написать «свое» — пожалуйста. rxjs или любой другой подход будут работать ± одинаково.