Всем привет. На связи Омельницкий Сергей. Не так давно я вел стрим по реактивному программированию, где рассказывал про асинхронность в JavaScript. Сегодня я бы хотел законспектировать этот материал.



Но перед тем как начать основной материал нам нужно сделать вводную. Итак, давайте начнем с определений: что такое стек и очередь?


Стек — это коллекция, элементы которой получают по принципу «последний вошел, первый вышел» LIFO


Очередь — это коллекция, элементы которой получают по принципу («первый вошел, первый вышел» FIFO


Окей, продолжим.



JavaScript — это однопоточный язык программирования. Это значит, что он имеется только один поток выполнения и один стек, в который помещаются функции в очередь на выполнение. Следовательно в один момент времени JavaScript может выполнить только одну операцию, другие операции при этом будут ждать своей очереди в стеке, пока их не вызовут.


Стек вызовов — это структура данных, которая, упрощенно говоря, записывает сведения о месте в программе, где мы находимся. Если мы переходим в функцию, мы помещаем запись о ней в верхнюю часть стека. Когда мы из функции возвращаемся, мы вытаскиваем из стека самый верхний элемент и оказываемся там, откуда вызывали эту функцию. Это — всё, что умеет стек. А теперь крайне интересный вопрос. Как тогда работает асинхронность в JavasScript?



На самом деле помимо стека в браузерах присутствует особая очередь для работы с так называемым WebAPI. Функции из этой очереди выполнятся по порядку только после того, как стек будет полностью очищен. Только после этого они помещаются из очереди в стек на выполнение. Если в стеке в данный момент находится хотя бы один элемент, то они в стек попасть не могут. Как раз именно из-за этого вызов функций по таймаута часто бывает не точным по времени, так как функция не может попасть из очереди в стек, пока он заполнен.


Рассмотрим следующий пример и займёмся его пошаговым «выполнением». Также посмотрим, что при этом происходит в системе.


console.log('Hi');
setTimeout(function cb1() {
    console.log('cb1');
}, 5000);
console.log('Bye');


1) Пока ничего не происходит. Консоль браузера чиста, стек вызовов пуст.



2) Потом команда console.log('Hi') добавляется в стек вызовов.



3) И она выполняется



4) Затем console.log('Hi') удаляется из стека вызовов.



5) Теперь переходим к команде setTimeout(function cb1() {… }). Она добавляется в стек вызовов.



6) Команда setTimeout(function cb1() {… }) выполняется. Браузер создаёт таймер, являющийся частью Web API. Он будет выполнять обратный отсчёт времени.



7) Команда setTimeout(function cb1() {… }) завершила работу и удаляется из стека вызовов.



8) Команда console.log('Bye') добавляется в стек вызовов.



9) Команда console.log('Bye') выполняется.



10) Команда console.log('Bye') удаляется из стека вызовов.



11) После того, как пройдут, как минимум, 5000 мс., таймер завершает работу и помещает коллбэк cb1 в очередь коллбэков.



12) Цикл событий берёт c функцию cb1 из очереди коллбэков и помещает её в стек вызовов.



13) Функция cb1 выполняется и добавляет console.log('cb1') в стек вызовов.



14) Команда console.log('cb1') выполняется.



15) Команда console.log('cb1') удаляется из стека вызовов.



16) Функция cb1 удаляется из стека вызовов.


Взглянем на пример в динамике:



Ну вот мы и рассмотрели как в JavaScript реализована асинхронность. Теперь давайте кратко поговорим об эволюции асинхронного кода.


Эволюция асинхронного кода.


a(function (resultsFromA) {
    b(resultsFromA, function (resultsFromB) {
        c(resultsFromB, function (resultsFromC) {
            d(resultsFromC, function (resultsFromD) {
                e(resultsFromD, function (resultsFromE) {
                    f(resultsFromE, function (resultsFromF) {
                        console.log(resultsFromF);
                    })
                })
            })
        })
    })
});

Асинхронное программирование, каким мы его знаем в JavaScript, может быть реализовано только функциями. Они могут быть переданы как любая другая переменная другим функциям. Так родились коллбэки. И это прикольно, весело и задорно, пока не превращается в грусть, тоску и печаль. Почему? Да все просто:


  • С ростом сложности кода, проект быстро превращается в малопонятные многократно вложенные блоки — «callback hell».
  • Обработку ошибок можно легко упустить.
  • Нельзя возвращать выражения с return.

С появлением Promise обстановка стала чуть лучше.


new Promise(function(resolve, reject) {
    setTimeout(() => resolve(1), 2000);

}).then((result) => {
    alert(result);
    return result + 2;

}).then((result) => {
    throw new Error('FAILED HERE');
    alert(result);
    return result + 2;

}).then((result) => {
    alert(result);
    return result + 2;

}).catch((e) => {
    console.log('error: ', e);
});

  • Появились цепочки промисов, что улучшило читаемость кода
  • Появился отдельный метод перехвата ошибок
  • Появилась возможность параллельного выполнения с помощью Promise.all
  • Вложенную асинхронность мы можем решить с помощью async/await

Но у промиса есть свои ограничения. К примеру промис, без танцев с бубном, нельзя отменить, а что самое главное — работает с одним значением.


Ну вот мы и плавно подошли к реактивному программированию. Устали? Ну благо дело можно можно пойти заварить чаек, обмозговать и вернуться читать далее. А я продолжу.


Основы реактивного программирования


Реактивное программирование?—?парадигма программирования, ориентированная на потоки данных и распространение изменений. Давайте более детально разберем что такое поток данных.


// Получаем ссылку на элемент
const input = ducument.querySelector('input');

const eventsArray = [];

// Пушим каждое событие в массив eventsArray
input.addEventListener('keyup',
    event => eventsArray.push(event)
);

Представим, что у нас есть поле ввода. Мы создаем массив, и на каждый keyup события input мы будем сохранять событие в нашем массиве. При этом хотелось бы отметить, что наш массив отсортирован по времени? т.е. ?индекс более поздних событий больше, чем индекс более ранних. Такой массив представляет собой упрощенную модель потока данных, но это еще не поток. Для того чтоб этот массив можно было смело назвать потоком он должен уметь каким-то образом сообщать подписчикам, что в него поступили новые данные. Таким образом мы подошли к определению потока.


Поток данных


const { interval } = Rx;
const { take } = RxOperators;

interval(1000).pipe(
    take(4)
)


Поток?—?это массив данных, отсортированных по времени, который может сообщать о том, что данные изменились. А теперь представьте как удобно становится писать код, в котором на одно действие потребуется вызывать несколько событий в разных участках кода. Мы просто делаем подписку на поток и он нам сам сообщит когда произойдут изменения. И это умеет делать библиотека RxJs.



RxJS — это библиотека для работы с асинхронными и основанными на событиях программами с использованием наблюдаемых последовательностей. Библиотека предоставляет основной тип Observable, несколько вспомогательных типов (Observer, Schedulers, Subjects) и операторы работы с событиями как с коллекциями (map, filter, reduce, every и подобные из JavaScript Array).


Давайте разберемся с основными понятиями этой библиотеки.


Observable, Observer, Producer


Observable — первый базовый тип, который мы рассмотрим. Этот класс содержит в себе основную часть реализации RxJs. Он связан с наблюдаемым потоком, на который можно как подписаться с помощью метода subscribe.


В Observable реализуется вспомогательный механизм для создания обновлений, так называемый Observer. Источником значений для Observer называется Producer. Это может быть массив, итератор, web socket, какое-то событие и т.п. Так что можно сказать, что observable является проводником между Producer и Observer.


Observable обрабатывает три вида событий Observer:


  • next – новые данные
  • error – ошибку, если последовательность завершилась по причине исключительной ситуации. это событие так же предполагает завершение последовательности.
  • complete — сигнал о завершении последовательности. Это означает, что новых данных больше не будет

Посмотрим демо:



В начале мы обработаем значения 1, 2, 3, а спустя 1 сек. мы получим 4 и завершим наш поток.


Мысли в слух

И тут я понял, что рассказывать было интересней чем писать об этом. :D


Subscription


Когда мы делаем подписку на поток, мы создаем новый класс subscription, который дает нам возможность отменить подписку с помощью метода unsubscribe. Так же мы можем сгруппировать подписки с помощью метода add. Ну и логично, что мы можем разгруппировать потоки с помощью remove. Методы add и remove на вход принимают другую подписку. Хотелось бы отметить, что когда мы делаем отписку, то мы отписываемся от всех дочерних подписок как будто бы и у них вызывали метод unsubscribe. Идем дальше.


Виды потоков


HOT COLD
Producer создается снаружи observable Producer создается внутри observable
Данные передаются в момент создания observable Данные сообщаются в момент подписки
Нужна дополнительная логика для отписки Поток завершается самостоятельно
Использует связь один-к-многим Использует связь вида один-к-одному
Все подписки имеют единое значение Подписки независимы
Данные можно потерять, если нет подписки Переиздает все значения потока для новой подписки

Если приводить аналогию, то я бы представил горячий поток как фильм в кинотеатре. В какой момент времени ты пришел, с того момента и начал просмотр. Холодный поток я бы сравнил со звонком в тех. поддержку. Любой позвонивший слушает запись автоответчика от начала до конца, но ты можешь бросить трубку с помощью unsubscribe.


Хотелось бы отметить, что существуют еще так называемые warm потоки ( такое определение я встречал крайне редко и только на зарубежных сообществах ) — это поток, который трансформируется из холодного потока в горячий. Возникает вопрос — где использовать)) Приведу пример из практики.


Я работаю с ангуляром. Он активно использует rxjs. Для получения данных на сервер я ожидаю холодный поток и этот поток использую в шаблоне с помощью asyncPipe. Если я использую этот пайп несколько раз, то, возвращаясь к определению холодного потока, каждый pipe будет запрашивать данные с сервера, что мягко говоря странно. А если я преобразую холодный поток в теплый, то запрос произойдет единожды.


Вообще понимание вида потоков достаточно сложна для начинающих, но важна.


Operators


return this.http.get(`${environment.apiUrl}/${this.apiUrl}/trade_companies`)
    .pipe(
        tap(({ data }: TradeCompanyList) => this.companies$$.next(cloneDeep(data))),
        map(({ data }: TradeCompanyList) => data)
    );

Расширить возможность работы с потоками нам предоставляют операторы. Они помогают контролировать события, протекающие в Observable. Мы рассмотрим парочку наиболее популярных, а подробнее с операторами можно ознакомиться по ссылкам в полезной информации.


Operators — of


Начнем со вспомогательного оператора of. Он создает Observable на основе простого значения.



Operators — filter



Оператор фильтрации filter, как можно понять по названию, фильтрует сигнал потока. Если оператор возвращает истину, то пропускает далее.


Operators — take



take — Принимает значение кол-ва эмитов, после которого завершает поток.


Operators — debounceTime



debounceTime — отбрасывает испускаемые значения, которые попадают в указанный промежуток времени между выходными данными — по прошествии временного интервала эмитит последнее значение.


const { Observable } = Rx;
const { debounceTime, take } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  observer.next(i++);
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++)
  }, 1000);

 // Испускаем значение раз в 1500мс
  setInterval(() => {
    observer.next(i++)
  }, 1500);
}).pipe(
  debounceTime(700),  // Ожидаем 700мс значения прежде чем обработать
  take(3)
);  


Operators — takeWhile



Эмитит значения, пока takeWhile не вернет false, после чего отпишется от потока.


const { Observable } = Rx;
const { debounceTime, takeWhile } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  observer.next(i++);
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++)
  }, 1000);
}).pipe(
  takeWhile( producer =>  producer < 5 )
);  


Operators — combineLatest


Комбинированный оператор combineLatest чем-то похож на promise.all. Он объединяет несколько потоков в один. После того как каждый поток сделает хотя бы один эмит, мы получаем последние значения от каждого в виде массива. Далее, после любого эмита из объединённых потоков он будет отдавать новые значения.



const { combineLatest, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
});

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750мс
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
});

combineLatest(observer_1, observer_2).pipe(take(5));


Operators — zip


Zip — ждет значение из каждого потока и формирует массив на основе этих значений. Если значение не придет из какого-либо потока, то группа не будет сформирована.



const { zip, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
});

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
});

const observer_3 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 500
  setInterval(() => {
    observer.next('c: ' + i++);
  }, 500);
});

zip(observer_1, observer_2, observer_3).pipe(take(5));


Operators — forkJoin


forkJoin также объединяет потоки, но он емитнит значение только когда все потоки будут завершены (complete).



const { forkJoin, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
}).pipe(take(3));

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
}).pipe(take(5));

const observer_3 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 500
  setInterval(() => {
    observer.next('c: ' + i++);
  }, 500);
}).pipe(take(4));

forkJoin(observer_1, observer_2, observer_3);


Operators — map


Оператор трансформации map преобразует значение эмита в новое.



const {  Observable } = Rx;
const { take, map } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++);
  }, 1000);
}).pipe(
  map(x => x * 10),
  take(3)
);


Operators – share, tap


Оператор tap - позволяет делать side эффекты, то есть какие-либо действия, которые не влияют на последовательность.


Утилитный оператор share способен из холодного потока сделать горячим.



С операторами закончили. Перейдем к Subject.


Мысли в слух

И тут я пошел чаек пить. Утомили меня эти примеры :D


Семейство subject-ов


Семейство subject-ов являются ярким примером горячих потоков. Эти классы являются неким гибридом, которые выступают одновременно в роли observable и observer. Так как subject является горячим потоком, то от него необходимо отписываться. Если говорить по основным методам, то это:


  • next – передача новых данных в поток
  • error – ошибка и завершение потока
  • complete – завершение потока
  • subscribe – подписаться на поток
  • unsubscribe – отписаться от потока
  • asObservable – трансформируем в наблюдателя
  • toPromise – трансформирует в промис

Выделяют 4 5 типов subject-ов.


Мысли в слух

На стриме говорил 4, а оказалось они еще один добавили. Как говорится век живи век учись.


Простой Subject new Subject()– самый простой вид subject-ов. Создается без параметров. Передает значения пришедшие только после подписки.


BehaviorSubject new BehaviorSubject( defaultData<T> ) – на мой взгляд самый распространённый вид subject-ов. На вход принимает значение по умолчанию. Всегда сохраняет данные последнего эмита, которые передает при подписке. Данный класс имеет так же полезный метод value, который возвращает текущее значение потока.


ReplaySubject new ReplaySubject(bufferSize?: number, windowTime?: number) — На вход опционально может принять первым аргументом размер буфера значений, которые он будет в себе хранить, а вторым время в течении которого нам нужны изменения.


AsyncSubject new AsyncSubject() — при подписке ничего не происходит, и значение будет возвращено только при complete. Будет возвращено только последнее значение потока.


WebSocketSubject new WebSocketSubject(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) — О нем документация молчит и я сам его в первый раз вижу. Кто знает что он делает пишите, дополним.


Фуф. Ну вот мы и рассмотрели все, что я хотел сегодня рассказать. Надеюсь данная информация была полезной. Самостоятельно ознакомиться со списком литературы можно во вкладке полезная информация.


Полезная информация


Комментарии (50)


  1. unma
    03.08.2019 19:19
    +3

    А мне обычно достаточно промисов, а Rx считаю излишним усложнением. Ок, нельзя отменять, но зачем? Чаще всего результат идёт в локальный стейт компонента, ну и пусть пишет сколько хочет. Дебоунс нужен, но он просто вешается на внешнюю функцию/метод и все. А остальные Rx операторы если более менее серьезно использовать превращаются в кашу, которую ещё надо постараться отдалить. Не против Rx в целом, но против его пиара как универсального средства на замену промисам. Потому что ангуляр решил выпендриться. Почему observable? Давайте сразу каналы и возможность напихивать с двух сторон, но кому это надо?
    На мой взгляд лучше взять простейшие промисы, async/await и допилить где надо, чем сразу брать Rx, потому что он как бы лишён недостатков промисов. Не надо забывать что главная проблема разработки — это сложность ПО, а ее недостаток фич. Ну я про свой CRUD все конечно, может у кого проблемы поглобальнее.


    1. Sergamers Автор
      03.08.2019 20:05
      -1

      Спасибо за ваш комментарий. Где ж он был когда я стирм вел, а то там все такие стесняшки были)

      Согласен, что для простых проектов, когда у вас обычный запрос/ответ Rx избыточен. Да и технологии выбираются из расчета задач и компетенции команды.

      А в целом RxJs хорош. Подписку не сделал — поток не работает, память не кушает; для работы с данными можно использовать единый интерфейс будь то клиентское событие или web-socket; связь один-к-многим вообще огонь, когда ты создаешь свой горячий поток.


    1. elepner
      04.08.2019 01:35

      Как же мне понятны ваши аргументы и как же вы неправы :) Был на вашем месте пару лет назад, когда начинал работать с Ангуляром. Считал observable каким-то выпендрежем без цели. Сейчас мне больно смотреть на спагетти с async/await.

      Почему observable?

      Потому что выбор правильных абстракций — это половина успеха, а весь наш мир — это один большой поток событий, который движется только вперед (кстати, поэтому двусторонний канал уже излишество) и именно поэтому CQRS шагает по планете вместе с функциональным программированием.
      Чтобы прочувствовать rxjs попробуйте написать свой triple/quad click в качестве упражнения классически на таймаутах, а потом с rxjs. Я даю такое задание своим джуниорам. Тут можете подсмотреть ответ. У меня получилось уложиться в 5 строчек.
      А чтобы его полюбить, напишите свой хороший typeahead с отменой HTTP запросов на промисах и с rxjs, почувствуйте разницу.


      1. unma
        04.08.2019 07:14
        +1

        Не уверен что вы поняли мой посыл. Абстракция на то и абстракция, что берет только необходимое из реального мира. И в моем случае, также как мне кажется в большинстве, как минимум на данный момент то что я вижу на рынке и скорее всего останется — промисов вполне достаточно. А значит и не нужно пенеусложнять, придумывая себе проблемы на пустом месте. Я это про себя, а не про вас. Возможно у вас такие задачи составляют основную часть, но у меня — это такой минимум, что мне проще завернуть один хелпер, чем тащить Rx, потому что это однозначно усложнение, лишние абстракции, лишний вес, и пока нет нативной поддержки. Так что нет абсолют правильных абстракций, есть уместные. Таймауты прекрасно оборачиваются в промисы и я не понял о каком спагетти идёт речь, прекрасный линейный код на async/await.


        1. elepner
          04.08.2019 11:38
          +1

          Я вам дал несколько простых пирмеров, где промисов уже недостаточно. Неужели вы никогда не писали свой typeahead? Или, например, 3 зависимых селекта Страна -> Регион -> Город. На каждом шаге нужно сделать запрос, чтобы загрузить список регионов и городов. Если меняем страну нужно сбросить регион и город. Поверх этого неплохо было бы еще навернуть нормальный retry policy. При ошибке запроса делаем еще один, потом через 500мс, потом 2000мс, потом кидаем ошибку. Хотел бы я посмотреть, как это смотрится без реактивности.


          1. unma
            04.08.2019 13:03
            +1

            Выберите один и разберём если хотите. С вас реализация и с меня. Хотя скорее всего это уже миллион раз было написано, может проще поискать? То что промисы не умеют это из коробки, не значит что нельзя вообще. Правильно? Вопрос в сложности реализации. Ок, можем померить. Только как будем оценивать?
            Что касается retry policy ну такое себе. Даже если оно и есть, то уж наверно в одном месте завернуто и по большому счету какая разница как оно реализовано.
            Мне кажется вы упорно пытаетесь игнорировать мой аргумент, что всему свое место. Допустим даже все ваши примеры будут в реальном приложении. Это ещё совсем не говорит о том, что теперь нужен rx. В этих случаях возможно красиво, но как насчёт всего остального? Концепция промисов проста и линейна, в ней проще разобраться, тогда как в Rx есть миллион операторов, которые ещё надо найти/запомнить. В целом то код усложнится. Вопрос ради чего? Ради красоты в трёх местах? Ок, у вас таких мест много — берите Rx, я бы и сам взял. Но это далеко не мейнстрим как мне кажется. И да, потом ещё отлаживать этот винегрет.


            1. alexs0ff
              04.08.2019 21:57

              То что промисы не умеют это из коробки, не значит что нельзя вообщ

              Если есть что-то «из коробки», то тогда лучше его выбрать, чем городить очередной свой велосипед, с которым другим людям нужно будет разбираться. Когда приходишь на новый проект и видишь применение стандартных механизмов предназначенных именно для решения конкретных проблем, как-то лучше работается, чем очередной раз смотреть «на полет мысли предыдущего автора».


          1. MaZaAa
            04.08.2019 20:52

            Это элементарно сделать, но боюсь представить какое это убожество будет с Rx


          1. MaZaAa
            05.08.2019 11:03
            -1

            stackblitz.com/edit/rxjs-hnlgh2

            Всё элементарно и просто, не надо никакой лапши из RxJs


      1. vintage
        04.08.2019 14:14

        Сейчас мне больно смотреть на спагетти с async/await.

        В случае async/await поток исполнения линеен, так что он не является спагетти по определению. В отличие от нагромождения замыканий в случае rxjs.


        весь наш мир — это один большой поток событий

        Это, мягко выражаясь, не так. Всё, что есть в нашем мире — это его состояние. Событие — весьма искусственная абстракция. Событиями мы называем изменения тех или иных состояний в определённый момент времени.


        поток событий, который движется только вперед (кстати, поэтому двусторонний канал уже излишество)

        Вы получили документ по ссылке http://example.org/foo/bar. По какой ссылке нужно его сохранять, чтобы не нарваться на двусторонний канал?


        • http://example.org/foo/bar?write — отдельный канал для записи ресурса, но роутер направит запрос на один и тот же обработчик, который получается будет двусторонним каналом.
        • http://example.org/write?foo/bar — разные обработчики, но сервер-то один. Опять двусторонний канал.
        • http://write.example.org/foo/bar — разные сервера, но протокол-то единый, выступающий как двусторонний канал в интернет.
        • http-write://example.org/foo/bar — разные протоколы, но через одну точку доступа, которая выступает двусторонним каналом связи с сетью.

        Значит должны быть различные сетевые подключения для чтения и записи данных. Но вы используете один комп для ввода-вывода. Значит у вас должен быть отдельно монитор, показывающий файл из интернета, подключённый к одной сети. И клавиатура с мышью, изменяющие что-то в интернете, подключённые к другой сети. Но вы используете одно и то же тело для передачи и получения информации.


        Это всё к вопросу о "правильных" абстракциях, соответствующих "всему нашему миру".


        Я даю такое задание своим джуниорам. Тут можете подсмотреть ответ. У меня получилось уложиться в 5 строчек.

        К сожалению, вы ни нативный код написать нормально не смогли, ни, что забавно, RXJS. Я поправил: https://stackblitz.com/edit/rxjs-bxmh4n?file=index.ts


        Обратите внимание на share, чтобы не было 100500 подписок в доме. На копипасту, чтобы повесить разные обработчики на разное число кликов. На потерю и восстановление объекта события. На обработку дефолтной ветки логики. И, конечно, на то, что кода на RxJS получилось в полтора раза больше. Причём куда более сложного, чем нативный.


        А чтобы его полюбить, напишите свой хороший typeahead с отменой HTTP запросов на промисах и с rxjs, почувствуйте разницу.

        Вы лучше сами попробуйте на правильной абстракции и почувствуйте разницу:


        @ $mol_mem
        typed( next? : string ) { return next || '' }
        
        @ $mol_mem
        suggestions() : string[] {
            const uri = `/suggestions?prefix=${ this.typed() }`
            return $mol_fetch.json( uri )
        }


        1. elepner
          04.08.2019 14:55

          В перемере, который я выслал, надо было оставить комментарии, конечно. Он был для внутреннего пользования.
          Кода на rxjs получилось 6 строчек: с 55 линии по 61. Все что выше, это описание naive подхода, чтобы можно было показать ход мыслей от простого подхода к продвинотому.

          Вы получили документ по ссылке example.org/foo/bar. По какой ссылке нужно его сохранять, чтобы не нарваться на двусторонний канал?

          Не понял, что вы имели в виду. Ресурс по этой ссылке изменяется через PUT/PATCH на example.org/foo/bar. При чем тут вообще каналы?
          Про спагетти я неверно выразился. Имелось в виду неявное состояние, которое является бичем любой системы и async/await этому активно способствует.


      1. timeout2x
        04.08.2019 14:44

        Зачем вы сравниваете ваниллу яваскрипт с rxjs?
        С тем же lodash можно вполне уложиться в 3 строчки: stackblitz.com/edit/rxjs-pm9jc8?file=index.ts

        Хотя пример и неплох в качестве джуниор-фильтра, к статье он подходит плохо в качестве иллюстрации.


        1. elepner
          04.08.2019 15:03

          Ваш пример не работает, если я сделаю 4 клика, то он сработает как тройной. И у вас опять же присутсвует неявное состояние count.


          1. timeout2x
            04.08.2019 15:21

            Не учел таких деталей, да собственно и не вдумывался особо.
            Ну тогда проверку перенести в `_.debounce` функцию чтобы по окончанию она смотрела сколько кликов было. Тогда можно будет посчитать и сравнить.
            Переменная `count` инициализирована выше. Заверните в closure если вы не хотите ее выдавать наружу, не вижу проблемы.

            Опять же, это просто иллюстрация примера натягивания совы на глобус. В данном примере rxjs ну не нужен от слова совсем. Да, его можно притянуть если вся остальная разработка на rxjs. В иных случая ни потоки, ни промисы для решения этой задачи не нужны.

            Таким же образом решаются задачи с typeahead — посмотрите их реализацию в куче других библиотек. Да, rxjs тут более оправдан (чем подсчет кликов по кнопке). Но опять же, в большинстве случаев просто берется готовая библиотека. Если у вас есть сильное желание написать «свое» — пожалуйста. rxjs или любой другой подход будут работать ± одинаково.


  1. alexs0ff
    03.08.2019 19:35
    +1

    В знакомство с реактивным программированием, на мой взгляд, нужно добавлять сравнение Pull и Push моделей получения данных.


  1. Timuch
    03.08.2019 20:34

    Спасибо, с удовольствием прочёл)


  1. DmitryKoterov
    03.08.2019 22:34
    +3

    1. Есть какие-то best practices по скрещиванию RxJS-подхода с async/await-подходом? Например, «у меня есть observable, а нужен async generator» или «у меня есть async generator, нужен observable, а потом обратно async generator» — и все на TypeScript. Вот на эту тему статья не помешала бы.
    2. Каков глубинный смысл в том, что http.get — observable, если он возвращает ровно одно значение? Чем здесь концептуально промис плох, который можно await?

    Вопросы выше скорее в контексте серверной разработки, чем клиентской, кстати.


    1. denis-isaev
      04.08.2019 02:00
      +1

      2) Просто для унификации интерфейса, чтобы без лишнего кода можно было сделать что-нибудь вида


      race(http.get(proxy1), http.get(proxy2)).map(...convert response...).subscribe(...)

      race — передаст в map первый пришедший ответ (от быстрейшего прокси).


    1. elepner
      04.08.2019 11:55
      +1

      Каков глубинный смысл в том, что http.get — observable, если он возвращает ровно одно значение?

      Смыслов тут несколько. Во-первых, с fetch api можно получать и прогресс запроса, т.е. ваше утверждение не совсем верно. Во-вторых, и на мой взгляд самое главное, Promise нельзя отменить, а от Observable можно отписаться, что ведет к отмене запроса.


    1. lehkap
      04.08.2019 12:19

      В RxJs есть оператор fromPromise для превращения Promise в Observable. Так же у observable есть метод toPromise делает обратно из observable promise.
      Когда начинал во второй Angular, подумал нафиг observable, в каждом методе вызывал .toPromise() в конце. Потом понял что ошибся, теперь возвращаю Observable, при необходимости вешаю операторы или скрещиваю несколько Observable и далее
      await myFinalObservable.toPromise().
      Тогда и callbackHell не возникает и всю мощь и лаконичность RxJs можно использовать


      1. unma
        04.08.2019 13:17

        Тоже начинал с toPromise))) думал зачем это все. Теперь честно делаю subscribe, но не из-за функционала больше, а потому что уж лучше что-то одно выбрать, везде использовать и не грузить зря мозг. Просто человеческий фактор.


      1. DmitryKoterov
        05.08.2019 09:52

        Ага, интересно, давайте поговорим про toPromise(). Например, у меня есть observable, который выдает 1, 2, 3 и потом заканчивается. Если я делаю:

        await observable.subscribe(val => process(val)).toPromise();
        // кажется, не так — но как, кстати?

        то тогда process() вызовется по разу для 1, 2, 3, а вызывающий поток управления будет await-ать окончания этого дела и потом продолжится. Великолепно.

        Внимание — вопрос. Что, если process() — сама по себе async-функция, да еще довольно запутанная, и внутри нее может произойти исключение? Что в этом случае произойдет?

        Ведь unhandled exception же скорее всего, нет?

        Как через observable «замкнуть» все эти исключения на основном потоке управления, чтобы он не расползся, и чтобы гарантированно все исключения в зависимом коде не вышли за скоуп этого основного потока управления? Вот в чем вопрос.

        (Под «скоупом основного потока управления» я понимаю такое свойство асинхронного куска кода, что его можно обернуть в try-catch, и из него ничего не «вылетит» неожиданного. Ведь это одно из самых полезных свойств async-await кода.)


        1. vintage
          05.08.2019 10:13

          // кажется, не так — но как, кстати?

          await observable.pipe(map(val => process(val))).toPromise()

          то тогда process() вызовется по разу для 1, 2, 3, а вызывающий поток управления будет await-ать окончания этого дела и потом продолжится.

          Нет, промис зарезолвится последним значением — 3. И то, если стрим будет закрыт. Пока стрим не закроется промис не зарезолвится.


          Что в этом случае произойдет?

          Стрим закроектся, а промис зареджектится. Закрытие стримов при ошибках и невозможность их переоткрытия — та ещё боль. Приходится создавать дерево стримов заново.


          Как через observable «замкнуть» все эти исключения на основном потоке управления

          Да так же как с промисами — оператор catch. Логически промисы — частный случай обсерваблов.


          1. DmitryKoterov
            05.08.2019 10:40
            +1

            Стрим закроектся, а промис зареджектится.

            Точно? Напоминаю, process — async-функция. Если в map() передается коллбэк, возвращающий промис, то разве observable будет дожидаться резолва этого промиса перед тем, как выплевывать следующее значение?


            В идеале что бы я хотел сделать, это сконвертить observable не в промис, а в async generator. Чтобы можно было написать типа такого:


            try {
              for await (const v of observable) {
                await process(v);
              }
            } catch (e) { ... }
            


        1. mayorovp
          05.08.2019 10:52

          В асинхронном случае надо вот так делать:


          await observable.concatMap(val => from(process(val))).toPromise();

          Также можно использовать mergeMap если порядок обработки не важен, а параллельность — напротив, как раз важна.


          1. DmitryKoterov
            05.08.2019 11:14

            Вот же ж адище. Но спасибо, логика понятна!

            И последнее еще — предложенный вами вариант с mergeMap (т.е. параллельно), но так, чтобы (три разных независимых варианта):
            А) одновременно выполнялось не более N функций process() а остальные ждали своей очереди
            Б) все разбивалось бы на чанки по N элементов, для каждого чанка process() выполнялись бы параллельно, а сами чанки — последовательно;
            В) все развивалось бы на чанки, но не фиксированного размера, а до тех пор, пока не выполнится некоторое условие насчет элементов текущего чанка (такой write barrier как бы).

            Как?


            1. mayorovp
              05.08.2019 11:33

              Пункт А — это штатная функциональность mergeMap.


              Пункты Б и В — это уже вам нужен buffer и его вариации:


              await observable.pipe(
                  bufferCount(10),
                  concatMap(buf => from(Promise.all(buf.map(process)))),
                  concatAll()
              ).toPromise()


    1. funca
      05.08.2019 13:26

      Для единообразия есть IxJS от той же команды, что и RxJS. Но толку мало, т.к. попытки совместить оба подхода, например в реплизации backpreasure, превращаются в научную проблему. Все лучше там, где дуплексное взаимодействие заложено изначально. Например streams из nodejs или callbaGs.


  1. Sirion
    03.08.2019 23:27

    Странно, почему здесь до сих пор нет комментариев товарища vintage


    1. vintage
      04.08.2019 00:12
      +2

      Жду особого приглашения. Впрочем, а что тут можно сказать? Могу разве что предложить заинтересовавшихся темой, почитать эти две статьи:
      https://habr.com/ru/post/307288/ — про асинхронное
      https://habr.com/ru/post/317360/ — про реактивное


  1. Paxest
    03.08.2019 23:36

    Первая часть повторяет это видео. www.youtube.com/watch?v=8cV4ZvHXQL4
    Очень понятно и доступно


  1. LeusMaximus
    04.08.2019 02:50

    Еще пара ссылок к полезной информации:


    • пример самописного Observable — видео
    • раздел по RxJS в скринкасте по Angular на learn.javascript.ru


  1. JustDont
    04.08.2019 05:21
    +2

    Вообще конечно не очень правильно говорить про реактивное программирование, и тут же уходить на RxJS. RxJS реактивен, но RxJS не единственен, и абстракции RxJS вовсе не о «простой» реактивности а-ля реакция на изменения, а о куда более конкретной реактивности (потоки событий или же push-коллекции). Архитектура RxJS чрезмерно конкретизирована — это реактивность в специфической обертке.

    PS: А второй большой камень в огород RxJS — это вот эти самые магические методы на тему «как сделать с потоком то, что мне нужно». Без документации это абсолютно неподъемно, надо, наверное, писать конкретно на RxJS 24/7, чтоб помнить хотя бы треть методов. Я отлично помню времена, когда команде RxJS вздумалось переделать документацию, и какое-то время всё просто лежало, а потом еще какое-то время не было адекватного поиска. И разработка на эти дни просто встала.


    1. unma
      04.08.2019 08:26
      +1

      О да, перевели внезапно на пайпы, а документация настолько убогая была, что приходилось в исходник лезть.


    1. Sergamers Автор
      04.08.2019 12:25

      Я бы хотел рассказать предысторию появления этого доклада. У меня есть группа, которую я активно веду. Цель у меня прийти от базовых вещей к разработке spa на ангуляре. Поэтому я рассказываю про реактивность и делаю уклон в сторону rxjs т.к. ангуляр его активно использует. Поэтому на стриме и здесь я рассказал о том, что мы будем в дальнейшем изучать.


  1. DmitryKoterov
    05.08.2019 11:36

    Еще один момент, по поводу того, что промисы нельзя отменять/прерывать, а observable можно.

    Тут правильнее было бы, наверное, сравнивать observable не с промисами, а с async generator-ами. И вот их «отменять», кажется, можно — самой семантикой языка (промисы-то что отменять, они либо случились, либо еще нет):

    async function something() {
      ...
    }
    
    async function* getStream() {
      for (let i = 0; i < 10; i++) {
        const v = await something(i); // или еще что-то, не важно
        yield v;
      }
    }
    
    for await (const v of getStream()) {
      if (v == “some”) break;
      ...
    }
    


    Так вот, если цикл выскочит на break, работа асинк-генератора getStream() тоже закончится. И потом сборщик мусора прибьет оставшиеся от него кишки. Отмена? Отмена.

    Я, правда, не до конца понимаю, почему это работает. Наверное, потому что после break-а “for await” перестает «впрыгивать» обратно в генератор, поэтому он подвисает в воздухе (раз в него никто не впрыгивает), а потом сам генератор выходит из области видимости, и сборщик мусора вычищает все зависимые вещи прямо в середине того for-а внутри getStream().


    1. mayorovp
      05.08.2019 11:42

      Этот способ позволяет прерывать генератор только в yield-позициях, а await-позиции куда важнее.


    1. MaZaAa
      05.08.2019 12:52
      -1

      Не забивай голову ерундой, не надо ничего отменять, просто проверяешь после асинхронных вызовов, актуальны ли они ещё и если нет, то просто ничего не делаешь, а если да, то уже дальше делаешь манипуляции с данными.

      Вот тут посмотри накидал на коленке банальных примеров stackblitz.com/edit/rxjs-hnlgh2


      1. mayorovp
        05.08.2019 12:54
        +1

        не надо ничего отменять

        Ну да, и DDOS-атака на свой же бакенд своим же штатным фронтэндом — это совершенно нормально, не надо тут ничего менять!


        1. MaZaAa
          05.08.2019 13:13
          -1

          Если это вы называете DDOS-атакой, то у меня плохие новости для ваших бэкендов. Есть такое понятие как здравый смысл, который гласит — не надо ради мелочи которая на самом деле ни на что не влияет превращать код в убожество. А если ваш бэкенд можно положить отправив на него ещё один лишний запрос, то извините, любой школьник его положит за секунды причем прямо из консоли браузера и хана вашему проекту/бизнесу.


          1. mayorovp
            05.08.2019 13:21

            Не "один лишний запрос", а "сотню лишних запросов с каждой открытой страницы".


            Так-то понятно, что на один лишний запрос внимания можно не обращать...


            1. MaZaAa
              05.08.2019 14:03

              Для таких кейсов тоже можно достаточно легко обойтись обвязкой для запросов к АПИ и вспомогательногьного класса который будет проверять актуальны ли ещё запросы или нет, и если нет, то просто их не выполнять и выбросить специальное исключение, а те что в pending отменить. И никаких RxJs не надо тут даже близко. Просто пишешь как обычно, понятный линейный код сверху вниз и при этом эти нюансы будут учтены уже в этой обвязке и хэлпер классе


              1. mayorovp
                05.08.2019 14:07

                То, что rx.js не обязательно нужна — это да, но начинали-то вы с того, что отмена запросов не нужна.


                1. MaZaAa
                  05.08.2019 14:14
                  -1

                  Она и не нужна в 99.9% случаев и проектов, но если у вас условно каждый клик генерирует по 100 запросов, то это уже индивидуальный случай и тут это актуально и опять же достаточно легко реализуется нативными средствами, чтобы код оставался чистым и красивым, а не лапшой. Но это актуально именно для единичных случаев, для подавляющего большинства отмена запросов не нужна и не актуальна. Но если заняться больше нечем, то можно и имплементировать, но так, чтобы код из-за это не страдал


              1. alexs0ff
                05.08.2019 14:21
                -1

                понятный линейный код

                декларативный код (к которому относится RxJS) на порядок легче поддерживать, чем императивный.


                1. MaZaAa
                  05.08.2019 14:27

                  Я надеюсь вы же шутите да?


                1. mayorovp
                  05.08.2019 14:28

                  Так себе декларативный. В сложных случаях нужно постоянно держать в уме какой оператор сколько подписок и в какие моменты держит...


                  1. alexs0ff
                    05.08.2019 14:30

                    В сложных случаях

                    А как себя поведет императивный подход в этих же случаях? Точно не лучше — «мешанина» будет на порядок сложнее.


                    1. mayorovp
                      05.08.2019 14:39

                      Вот в rx.js она и оказывается сложнее...


                1. vintage
                  05.08.2019 15:09
                  +1

                  декларативный код (к которому относится RxJS)

                  Не относится он ни к декларативному, ни к функциональному. Это скорее железнодорожное программирование.


                  на порядок легче поддерживать

                  Ну конечно. Любая не описанная в документации задача превращается в railroad tycoon. А описанная — в копипасту без глубокого понимания, что происходит.