Цель данной статьи – показать на примере зачем нужно reactive programming, как оно связано с функциональным программированием, и как с его помощью можно писать декларативный код, который легко адаптировать к новым требованиям. Кроме того, хочется сделать это максимально кратко и просто на примере приближенном к реальному.
Возьмем такую задачу:
Есть некий сервис c REST API и endpointом /people
. При POST-запросе на этот endpoint'a создается новая сущность. Написать функцию которая принимает массив объектов вида { name: 'Max' }
и создают набор сущностей посредством API(по-английски, это называется batch-операция).
Давайте решим эту задачу в императивном стиле:
const request = require('superagent')
function batchCreate(bodies) {
const calls = []
for (let body of bodies) {
calls.push(
request
.post('/people')
.send(body)
.then(r => r.status)
)
}
return Promise.all(calls)
}
Давайте, для сравнения, перепишем этот кусочек кода в функциональном стиле. Для простоты, под функциональным стилем мы будем понимать:
- Применение функциональных примитивов(.map, .filter, .reduce) вместо императивных циклов(for, while)
- Код организован в "чистые" функции – они зависят только от своих аргументов и не зависят от состояния системы
Код в функциональном стиле:
const request = require('superagent')
function batchCreate(bodies) {
const calls = bodies.map(body =>
request
.post('/people')
.send(body)
.then(r => r.status)
)
return Promise.all(calls)
}
Мы получили кусок кода такого же размера и стоит признаться что не понятно чем этот кусок лучше предыдущего.
Для того чтобы понять чем второй кусок кода лучше – нужно начать менять код, представим что к оригинальной задаче появилось новое требование:
У сервиса который мы вызываем появилось ограничение на количество запросов в промежуток времени: за секунду один клиент может выполнить не более пяти запросов. Выполнение большего количества запросов приведет к тому что сервис будет возвращать 429 HTTP ошибку(too many requests).
В этом месте, пожалуй, стоит остановиться и попробовать решить задачу самому, %username%
Возьмем за основу наш функциональный код и попробуем его изменить. Основная проблема "чистого" функционального программирования состоит в том, что оно ничего "не знает" — о среде выполнения и вводе-выводе(в английском для этого есть выражение side effects), но на практике мы постоянно с ними работаем.
Чтобы заполнить этот пробел на помощь приходит Reactive Programming — набор подходов пытающихся решить проблему side effects. Самой известной реализацией этой парадигмы является библиотека Rx, использующая концепцию reactive streams
Что такое reactive streams? Если очень кратко, то это подход позволяющий применить функциональные примитивы(.map, .filter, .reduce) к чему-то распределенному по времени.
Например, мы передаем по сети некий набор комманд – нам не нужно дожидаться пока мы получим весь набор, мы представляем его как reactive stream и можем с ним работать. Тут возникают еще два важных концепта:
- поток может быть бесконечным или как угодно долго распределенным по времени
- передающая сторона передает команду только в том случае, если принимающая готова ее обработать(backpressure)
Целью этой статьи является поиск легких путей, поэтому, мы возьмем библиотеку Highland, которая старается решить ту же задачу что и Rx, но намного проще в освоении. Идея лежащая внутри проста: давайте возьмем за основу Node.js streams и будем “переливать” данные из одного Stream в другой.
Приступим: начнем с простого — сделаем наш код "реактивным" без добавления нового функционала
const request = require('superagent')
const H = require(‘highland’)
function batchCreate(bodies) {
return H(bodies)
.flatMap(body =>
H(request
.post('localhost:3000/people')
.send(body)
.then(r => r.status)
)
)
.collect()
.toPromise(Promise)
}
На что стоит обратить внимание:
- H(bodies) – мы создаем stream из массива
- .flatMap и callback который он принимает. Идея довольно проста — мы заворачиваем Promise в конструктор потока чтобы получить поток с одним значением(или ошибкой. важно понимать что это именно значение, а не Promise).
В результате это нам дает поток потоков — при помощи flatMap мы сглаживаем это в один поток значений которым мы можем оперировать(кто сказал монада?) - .collect – нам нужен для того чтобы собрать все значения в одной "точке" в массив
- .toPromise – вернет нам Promise, который будет fulfilled в момент когда у нас будет значение из потока
Теперь давайте попробуем реализовать наше требование:
const request = require('superagent')
const H = require('highland')
function batchCreate(bodies) {
return H(bodies)
.flatMap(body =>
H(request
.post('localhost:3000/people')
.send(body)
.then(r => r.status)
)
)
.ratelimit(5, 1000)
.collect()
.toPromise(Promise)
}
Благодаря концепту backpressure – это всего лишь одна строчка .ratelimit в данной парадигме. В Rx это занимает приблизительно столько же места.
Ну вот и все, интересно ваше мнение:
- получилось ли у меня достичь декларируемого в начале статьи результата?
- можно ли достичь аналогичного результата используя императивный подход?
- заинтересовались ли вы Reactive programming?
P.S.: вот тут можно найти еще одну мою статью про Reactive Programming
Комментарии (40)
Riim
24.10.2018 19:03Реактивное программирование — парадигма программирования, ориентированная на потоки данных и распространение изменений.
где здесь потоки данных и распространение изменений?
Статья демонстрирует возможности метода ratelimit, такой метод можно и в какой-нибудь lodash запихать, работать будет ничуть не хуже.HaMI Автор
24.10.2018 19:06а можно попросить вас показать пример метода, который вы «запихаете» в lodash? Это не комментарий в стиле – сами напишите статью. Просто попробуйте мысленно реализовать этот метод, и, мне кажется, вы поймете «где здесь потоки данных и распространение изменений?»
HaMI Автор
24.10.2018 19:23И прошу не забывать о том что request – асинхронный и мы не знаем сколько времени может занять выполненние запроса(может быть больше 1 секунды)
Riim
24.10.2018 20:25Набросал:
<script> function LimitQueue(limit, timeout) { this.limit = limit; this.timeout = timeout; this.jobs = []; this.pending = 0; this.results = []; this._run = this._run.bind(this); } LimitQueue.prototype.push = function(jobs) { Array.prototype.push.apply(this.jobs, jobs); }; LimitQueue.prototype._run = function(results) { if (results) { this.pending = 0; Array.prototype.push.apply(this.results, results); setTimeout(this._run, this.timeout); if (!this.jobs.length) { this.callback(this.results); } return; } let promises = []; for (;;) { if (this.pending == this.limit || !this.jobs.length) { Promise.all(promises).then(this._run); break; } promises.push(this.jobs.shift()()); this.pending++; } }; LimitQueue.prototype.onDone = function(cb) { this.callback = cb; this._run(); }; // ======== let q = new LimitQueue(3, 1000); q.push( [{ name: 'Max1' }, { name: 'Max2' }, { name: 'Max3' }, { name: 'Max4' }, { name: 'Max5' }].map( data => () => { console.log(1, data.name); return new Promise((resolve, reject) => { console.log(2, data.name); setTimeout(() => { console.log(3, data.name); resolve(data.name); }, Math.ceil(Math.random() * 3000)); }); } ) ); q.onDone(results => { console.log(4, results); }); </script>
В консоли следующее:
1 "Max1" 2 "Max1" 1 "Max2" 2 "Max2" 1 "Max3" 2 "Max3" [задержка] 3 "Max3" [задержка] 3 "Max1" [задержка] 3 "Max2" [задержка 1000ms] 1 "Max4" 2 "Max4" 1 "Max5" 2 "Max5" [задержка] 3 "Max5" [задержка] 3 "Max4" 4 (5) ["Max1", "Max2", "Max3", "Max4", "Max5"]
Завернуть всё это в красивый интерфейс уже не проблема и как видите нет никаких потоков.
UPD: накосячил конечно немного, но суть я думаю понятна.
HaMI Автор
24.10.2018 20:41вот тут есть код условного сервер. Почему если я меняю код вашего емулятора промисов на реальный реквест,
то вижу только 5 запросовначинает происходить что-то странное?Riim
24.10.2018 20:51Потому что их там всего 5 и они разделены на пачки по 3. Сперва отправляются 3 запроса, после ответа на все (там случайная задержка подставляется) отправляются ещё 2, после ответа на оба выводятся все результаты. В общем, в выводе всё видно.
HaMI Автор
24.10.2018 20:57спасибо. Меня сбило с толку что почему-то зацикливается вывод по окончании
4 [ 201, 201, 201, 201, 201 ]
4 [ 201, 201, 201, 201, 201 ]
4 [ 201, 201, 201, 201, 201 ]
4 [ 201, 201, 201, 201, 201 ]
4 [ 201, 201, 201, 201, 201 ]Riim
24.10.2018 21:00да)), про это я и написал:
UPD: накосячил конечно немного, но суть я думаю понятна.
Поотлаживать конечно не помешает, считайте, что это псевдокод демонстрирующий как это делать без потоков.
HaMI Автор
24.10.2018 21:06что будет с вашим кодом, если произойдет следующее:
1 вызов займет 1с
2 вызов займет 5с
3 вызов займет 1с
через сколько времени начнется:
4-й вызов?
5-й вызов?Riim
24.10.2018 21:13Через 5, там Promise.all внутри, почитайте код, там же мало совсем, 3 минуты разобраться ;)
HaMI Автор
24.10.2018 21:20я спросил как раз потому что почитал. а как по вашему – через сколько должен? если у нас рейтлимит на 1 секунду.
Riim
24.10.2018 21:25Ошибся в комментарии, через 6 конечно, в коде этот момент правильно сделан.
HaMI Автор
24.10.2018 21:31Вы меня не правильно поняли, для того чтобы вызывать Апи с максимально возможной скоростью – через сколько должен происходить 4-й вызов при условии что первые два прошли за секунду?
Riim
24.10.2018 21:35В общем, самый долгий запрос из пачки плюс задержка. Если самый долгий 1 сек. и задержка 1 сек., то 2 секунды.
HaMI Автор
24.10.2018 22:03ок, смотрите – мы ушли в некоторые интересные дебри. С моей точки зрения – эта стратегия далеко не оптимальна. вам интересно продлжать этот разговор? я спрашиваю чтобы у вас не сложилось впечатление что я к вам «прикапываюсь»
Riim
24.10.2018 22:16Вы считаете, что складывать задержку и самый долгий запрос не нужно, а просто использовать самое долгое из них? Да, так оптимальнее. Доработать пример до этого минутное дело.
HaMI Автор
24.10.2018 22:44Я считаю что в каждое окно времени у вас должно быть пять «активных» запросов, тоесть не нужно дожидаться окончания самого длинного запроса, если началось новое «окно»
вот сервер с реальным тротлингом, там же код batchCreate и логи для сервера и create
Вот как выглядит кусочек лога(это нет тоже лог что в гисте!)
app finished 3 +53ms
app finished 2 +572ms
app finished 1 +11ms
app finished 0 +1s
app start 5 +1ms
app start 6 +1ms
app start 7 +0ms
app start 8 +0ms
app finished 4 +76ms
app start 9 +0ms < — свободное окно и пошел вызов, не дожидаясь окончания всех долгоиграющих запросов
app finished 6 +188ms
app finished 8 +1s
app finished 5 +74ms
app finished 9 +244ms
app finished 7 +121ms
понимайте о чем я? сможете так доизменить свой код? прошу заметить в коде batchCreate измененно две строчки(имею ввиду по сути, а не добавлен логгинг)Riim
25.10.2018 00:13понимайте о чем я?
понимаю, так ещё оптимальнее, но не вижу проблемы написать и это без потоков, навскидку, мне кажется, даже кода меньше получится (чем сейчас в примере выше). Только давайте по очереди? Мне просто не особо интересно такие задачки решать. Попробуйте сами, а я уже попробую сделать ещё проще.
HaMI Автор
25.10.2018 00:36я уже сделал. пример по ссылке именно так работает – просто в следствии рандомного времени задержки, это не всегда видно. теперь интересно посмотреть на ваш код
Riim
25.10.2018 01:05Вы использовали готовый метод сделанный именно под вашу задачу, на это не нужно много времени и умения, я же предлагаю вам решить эту интереснейшею задачу на чистом js. Мне кажется будет честно, если мы будем тратить одинаковое время впустую, а иначе вы начнёте перечислять другие возможности highland или конкретно этого метода, а мне сидеть и переписывать всё это на чистом js? Нет уж), либо признайте, что потоки в данном примере не обязательны, либо давайте по очереди.
HaMI Автор
25.10.2018 01:18извините, но по-моему, это ваше утвержение
но не вижу проблемы написать и это без потоков, навскидку, мне кажется, даже кода меньше получится
Я как раз тут проблему вижу. По-этому, если вам нечего больше сказать – давайте не будем занимать время друг-друга. СпасибоRiim
25.10.2018 02:31всё правильно, это мои слова, для меня это действительно не сложная задача, но я же не говорил, что хочу потратить ещё пол часа своего времени.
Я как раз тут проблему вижу
в чём проблема, в задаче? Вы действительно считаете её нерешаемой без потоков, серьёзно? Что там в потоках по вашему, божественный эфир? Или проблема в вас? Тогда это тем более в ваших интересах, потренируетесь. А то на собеседованиях уже страшно просить что-то простейшее написать.
HaMI Автор
24.10.2018 20:43и это на самом деле очень круто, что вы это написали. Вы согласны с тем что эта задача не настолько тривиальна, как кажется на первый взгляд?
Завернуть всё это в красивый интерфейс – давайте исходить из того что в красивый интерефейс можно что угодно завернутьRiim
24.10.2018 20:56Вы согласны с тем что эта задача не настолько тривиальна, как кажется на первый взгляд?
честно, нет, для меня сложность сразу ясна.
давайте исходить из того что в красивый интерефейс можно что угодно завернуть
вы же не хотите сказать, что внутри highland что-то более простое чем я предложил?
HaMI Автор
24.10.2018 21:05нет, конечно. Но Highland(Rx, lodash, orm, whatever) – позволяет решать много задач. вы предлагаете написать штуку которая решает одну задачу и завернуть ее в красивый интерфейс.
Riim
24.10.2018 21:26вы предлагаете написать штуку которая решает одну задачу
почему нет?
Хотя вопрос риторический, мне не нужно объяснять зачем объединять что-то в библиотеки, речь не про это, я говорю, что приведённый вами пример в статье никак не показывает плюсов реактивного программирования, а его ключевая часть легко переносится в другие библиотеки никак не связанные с РП. Если бы мне так объясняли РП я бы сказал "круто", но никогда бы не стал его использовать для себя.
HaMI Автор
24.10.2018 22:09буду рад если вы укажите ссылки на лучшие примеры. Большой курсеровский курс прошу не предлагать – в силу того что в он напорядок больше размера данной статьи
HaMI Автор
24.10.2018 22:55если честно, я не совсем понимаю что тут еще показывать
в вашем коде навскидку 70 cтрок
в моем 10
это если еще опустить факт того что там есть пассажи вида:
for (;;) {
if (this.pending == this.limit || !this.jobs.length) {
которые нужно вдумчиво читать
С Аргументом «вы же не хотите сказать, что внутри highland что-то более простое чем я предложил?» я не знаю как спорить – ним можно отбросить любую парадигму программирования и заменить ее на на набор функцийRiim
25.10.2018 00:35в вашем коде навскидку 70 cтрок
в моем 10причём тут количество строк? Ок, представьте другую библиотеку внутри которой ratelimit на основе моего кода. Представьте в lodash завтра появляется ratelimit реализованный без потоков. Как теперь докажете необходимость РП?
буду рад если вы укажите ссылки на лучшие примеры
я делал несколько докладов про РП и как-то начинал писать статью на эту тему. И каждый раз основной проблемой был именно хороший пример, глядя на который сразу становилось бы ясно, что это и зачем. Фишка РП в том, что плюсы становятся особенно видны именно в сложных ситуациях, а такое в статью или доклад не запихаешь. Мне нечего вам предложить в виде одного примера, но я похоже в скором времени ещё раз попробую написать такую статью. Может со второй попытки что-то получится.
HaMI Автор
25.10.2018 00:39>Как теперь докажете необходимость РП?
исходя из вашей логики, можно отменить «все» аргументируя тем, что это можно на машином коде написать.
Эта ветка ушла в холивар – это не интересно. предлагаю закончить. продолжить там где кодRiim
25.10.2018 01:12можно отменить «все»
я как раз не предлагаю отменять никаких абстракций, я предлагаю представить, что такой метод есть ещё в какой-то библиотеке, но реализован без потоков. Выше мы как раз обсуждаем насколько возможно сделать его без потоков.
alexandersh123
25.10.2018 01:02Вопрос «в лоб»: а как при таком подходе контролировать ratelimit в условиях, когда сервис api одновременно запущен в 2..n экземплярах? Отказ от масштабируемости?)
HaMI Автор
25.10.2018 01:16throtlling в простейшем случае выполняется по апи — вам это в принципе не нужно.
В более сложном – вам не обойтись без какой-то мастер ноды которая будет раздавать джобы слейвам, но это выходит за рамки этой статьи.
faiwer
25.10.2018 10:07+1Мне кажется в статье было куда больше смысла, если бы:
.ratelimit(5, 1000)
был написан "руками" с пояснениями. Тогда это была бы демонстрация "на пальцах", как можно удобно работать со stream-ми асинхронных данных. Вместо этого мы видим статью, которая сводится к "в highland для этого есть готовый удобный метод". Зачем? :)
И мне кажется в статье под названием "Самое краткое введение в Reactive Programming" должно быть простейшее описание паттерна Observer. А не реактивные библиотечные потоки с тыщей мутных методов :)
HaMI Автор
25.10.2018 12:09>простейшее описание паттерна Observer
мне так не кажется. Проще думать об этом как именно о потоках – отсюда «качаю», сюда «заливаю». Потом человек уже сам почитает про все остальное. Оставить статью краткой и рассказать о всем – заведомо не достижимая цель.faiwer
25.10.2018 22:13+1Ну вот смотрите. Приходит человек читать про реактивное программирование. Он не знает что такое stream-ы и протокола их работы. Он не знает где эти странные штуки могут быть удобными. Он не знает паттерна наблюдатель. Он весь во внимании. Ведь это краткое введение, ему сейчас всё объяснят. Что в итоге он видит в этой статье? Хм… Ну что есть какие-то сторонние библиотеки, которые позволяют записать привычные вещи в странном виде. Он хочет пояснений — а зачем так? Что тут под капотом происходит? А вы ему пример с rateLimit. Он думает — вот, интересный пример, сейчас мне всё расскажут и покажут. Вы говорите ему, мол есть готовый метод в 1 строку. Почувствуй силу
rxjshighland, о юный падаван! Он в ответ — эээээ, что? На этом статья заканчивается. Что нового для себя подчерпнул человек? В чём был "message"? :) Смотрите, как я могу?
Даже зная как работают nodejs-streams, observer-ы и пр. штуки, может быть совершенно не очевидно, в каких реальных жизненных обстоятельствах, эти непривычные подходы могут оказаться к месту. Тут ведь как с каррированием. Раз в неделю кто-нибудь пишет статью про то, что это такое. Однако людей, которые нашли ему применение в своих реальных задачах можно пересчитать по пальцам одной руки (ладно, тут я несколько утрирую).
NLO
НЛО прилетело и опубликовало эту надпись здесь