Цель данной статьи – показать на примере зачем нужно reactive programming, как оно связано с функциональным программированием, и как с его помощью можно писать декларативный код, который легко адаптировать к новым требованиям. Кроме того, хочется сделать это максимально кратко и просто на примере приближенном к реальному.


Возьмем такую задачу:
Есть некий сервис c REST API и endpointом /people. При POST-запросе на этот endpoint'a создается новая сущность. Написать функцию которая принимает массив объектов вида { name: 'Max' } и создают набор сущностей посредством API(по-английски, это называется batch-операция).


Давайте решим эту задачу в императивном стиле:


const request = require('superagent')

function batchCreate(bodies) {
  const calls = []
  for (let body of bodies) {
    calls.push(
      request
        .post('/people')
        .send(body)
        .then(r => r.status)
    )
  }
  return Promise.all(calls)
}

Давайте, для сравнения, перепишем этот кусочек кода в функциональном стиле. Для простоты, под функциональным стилем мы будем понимать:


  1. Применение функциональных примитивов(.map, .filter, .reduce) вместо императивных циклов(for, while)
  2. Код организован в "чистые" функции – они зависят только от своих аргументов и не зависят от состояния системы

Код в функциональном стиле:


const request = require('superagent')

function batchCreate(bodies) {
  const calls = bodies.map(body =>
    request
      .post('/people')
      .send(body)
      .then(r => r.status)
  )
  return Promise.all(calls)
}

Мы получили кусок кода такого же размера и стоит признаться что не понятно чем этот кусок лучше предыдущего.
Для того чтобы понять чем второй кусок кода лучше – нужно начать менять код, представим что к оригинальной задаче появилось новое требование:
У сервиса который мы вызываем появилось ограничение на количество запросов в промежуток времени: за секунду один клиент может выполнить не более пяти запросов. Выполнение большего количества запросов приведет к тому что сервис будет возвращать 429 HTTP ошибку(too many requests).


В этом месте, пожалуй, стоит остановиться и попробовать решить задачу самому, %username%


Возьмем за основу наш функциональный код и попробуем его изменить. Основная проблема "чистого" функционального программирования состоит в том, что оно ничего "не знает" — о среде выполнения и вводе-выводе(в английском для этого есть выражение side effects), но на практике мы постоянно с ними работаем.
Чтобы заполнить этот пробел на помощь приходит Reactive Programming — набор подходов пытающихся решить проблему side effects. Самой известной реализацией этой парадигмы является библиотека Rx, использующая концепцию reactive streams


Что такое reactive streams? Если очень кратко, то это подход позволяющий применить функциональные примитивы(.map, .filter, .reduce) к чему-то распределенному по времени.


Например, мы передаем по сети некий набор комманд – нам не нужно дожидаться пока мы получим весь набор, мы представляем его как reactive stream и можем с ним работать. Тут возникают еще два важных концепта:


  • поток может быть бесконечным или как угодно долго распределенным по времени
  • передающая сторона передает команду только в том случае, если принимающая готова ее обработать(backpressure)

Целью этой статьи является поиск легких путей, поэтому, мы возьмем библиотеку Highland, которая старается решить ту же задачу что и Rx, но намного проще в освоении. Идея лежащая внутри проста: давайте возьмем за основу Node.js streams и будем “переливать” данные из одного Stream в другой.


Приступим: начнем с простого — сделаем наш код "реактивным" без добавления нового функционала


const request = require('superagent')
const H = require(‘highland’)

function batchCreate(bodies) {
   return H(bodies)
    .flatMap(body =>
      H(request
        .post('localhost:3000/people')
        .send(body)
        .then(r => r.status)
      )
    )
    .collect()
    .toPromise(Promise)
}

На что стоит обратить внимание:


  • H(bodies) – мы создаем stream из массива
  • .flatMap и callback который он принимает. Идея довольно проста — мы заворачиваем Promise в конструктор потока чтобы получить поток с одним значением(или ошибкой. важно понимать что это именно значение, а не Promise).
    В результате это нам дает поток потоков — при помощи flatMap мы сглаживаем это в один поток значений которым мы можем оперировать(кто сказал монада?)
  • .collect – нам нужен для того чтобы собрать все значения в одной "точке" в массив
  • .toPromise – вернет нам Promise, который будет fulfilled в момент когда у нас будет значение из потока

Теперь давайте попробуем реализовать наше требование:


const request = require('superagent')
const H = require('highland')

function batchCreate(bodies) {
   return H(bodies)
    .flatMap(body =>
      H(request
        .post('localhost:3000/people')
        .send(body)
        .then(r => r.status)
      )
    )
    .ratelimit(5, 1000)
    .collect()
    .toPromise(Promise)
}

Благодаря концепту backpressure – это всего лишь одна строчка .ratelimit в данной парадигме. В Rx это занимает приблизительно столько же места.


Ну вот и все, интересно ваше мнение:


  • получилось ли у меня достичь декларируемого в начале статьи результата?
  • можно ли достичь аналогичного результата используя императивный подход?
  • заинтересовались ли вы Reactive programming?

P.S.: вот тут можно найти еще одну мою статью про Reactive Programming

Комментарии (40)


  1. NLO
    00.00.0000 00:00

    НЛО прилетело и опубликовало эту надпись здесь


  1. Riim
    24.10.2018 19:03

    Реактивное программирование — парадигма программирования, ориентированная на потоки данных и распространение изменений.

    где здесь потоки данных и распространение изменений?
    Статья демонстрирует возможности метода ratelimit, такой метод можно и в какой-нибудь lodash запихать, работать будет ничуть не хуже.


    1. HaMI Автор
      24.10.2018 19:06

      а можно попросить вас показать пример метода, который вы «запихаете» в lodash? Это не комментарий в стиле – сами напишите статью. Просто попробуйте мысленно реализовать этот метод, и, мне кажется, вы поймете «где здесь потоки данных и распространение изменений?»


    1. HaMI Автор
      24.10.2018 19:23

      И прошу не забывать о том что request – асинхронный и мы не знаем сколько времени может занять выполненние запроса(может быть больше 1 секунды)


      1. Riim
        24.10.2018 20:25

        Набросал:


        <script>
        
        function LimitQueue(limit, timeout) {
            this.limit = limit;
            this.timeout = timeout;
        
            this.jobs = [];
            this.pending = 0;
        
            this.results = [];
        
            this._run = this._run.bind(this);
        }
        
        LimitQueue.prototype.push = function(jobs) {
            Array.prototype.push.apply(this.jobs, jobs);
        };
        
        LimitQueue.prototype._run = function(results) {
            if (results) {
                this.pending = 0;
                Array.prototype.push.apply(this.results, results);
        
                setTimeout(this._run, this.timeout);
        
                if (!this.jobs.length) {
                    this.callback(this.results);
                }
        
                return;
            }
        
            let promises = [];
        
            for (;;) {
                if (this.pending == this.limit || !this.jobs.length) {
                    Promise.all(promises).then(this._run);
                    break;
                }
        
                promises.push(this.jobs.shift()());
                this.pending++;
            }
        };
        
        LimitQueue.prototype.onDone = function(cb) {
            this.callback = cb;
            this._run();
        };
        
        // ========
        
        let q = new LimitQueue(3, 1000);
        
        q.push(
            [{ name: 'Max1' }, { name: 'Max2' }, { name: 'Max3' }, { name: 'Max4' }, { name: 'Max5' }].map(
                data => () => {
                    console.log(1, data.name);
        
                    return new Promise((resolve, reject) => {
                        console.log(2, data.name);
        
                        setTimeout(() => {
                            console.log(3, data.name);
                            resolve(data.name);
                        }, Math.ceil(Math.random() * 3000));
                    });
                }
            )
        );
        
        q.onDone(results => {
            console.log(4, results);
        });
        
        </script>

        В консоли следующее:


        1 "Max1"
        2 "Max1"
        1 "Max2"
        2 "Max2"
        1 "Max3"
        2 "Max3"
        [задержка]
        3 "Max3"
        [задержка]
        3 "Max1"
        [задержка]
        3 "Max2"
        [задержка 1000ms]
        1 "Max4"
        2 "Max4"
        1 "Max5"
        2 "Max5"
        [задержка]
        3 "Max5"
        [задержка]
        3 "Max4"
        4 (5) ["Max1", "Max2", "Max3", "Max4", "Max5"]

        Завернуть всё это в красивый интерфейс уже не проблема и как видите нет никаких потоков.


        UPD: накосячил конечно немного, но суть я думаю понятна.


        1. HaMI Автор
          24.10.2018 20:41

          вот тут есть код условного сервер. Почему если я меняю код вашего емулятора промисов на реальный реквест, то вижу только 5 запросов начинает происходить что-то странное?


          1. Riim
            24.10.2018 20:51

            Потому что их там всего 5 и они разделены на пачки по 3. Сперва отправляются 3 запроса, после ответа на все (там случайная задержка подставляется) отправляются ещё 2, после ответа на оба выводятся все результаты. В общем, в выводе всё видно.


            1. HaMI Автор
              24.10.2018 20:57

              спасибо. Меня сбило с толку что почему-то зацикливается вывод по окончании
              4 [ 201, 201, 201, 201, 201 ]
              4 [ 201, 201, 201, 201, 201 ]
              4 [ 201, 201, 201, 201, 201 ]
              4 [ 201, 201, 201, 201, 201 ]
              4 [ 201, 201, 201, 201, 201 ]


              1. Riim
                24.10.2018 21:00

                да)), про это я и написал:


                UPD: накосячил конечно немного, но суть я думаю понятна.

                Поотлаживать конечно не помешает, считайте, что это псевдокод демонстрирующий как это делать без потоков.


                1. HaMI Автор
                  24.10.2018 21:06

                  что будет с вашим кодом, если произойдет следующее:
                  1 вызов займет 1с
                  2 вызов займет 5с
                  3 вызов займет 1с
                  через сколько времени начнется:
                  4-й вызов?
                  5-й вызов?


                  1. Riim
                    24.10.2018 21:13

                    Через 5, там Promise.all внутри, почитайте код, там же мало совсем, 3 минуты разобраться ;)


                    1. HaMI Автор
                      24.10.2018 21:20

                      я спросил как раз потому что почитал. а как по вашему – через сколько должен? если у нас рейтлимит на 1 секунду.


                      1. Riim
                        24.10.2018 21:25

                        Ошибся в комментарии, через 6 конечно, в коде этот момент правильно сделан.


                        1. HaMI Автор
                          24.10.2018 21:31

                          Вы меня не правильно поняли, для того чтобы вызывать Апи с максимально возможной скоростью – через сколько должен происходить 4-й вызов при условии что первые два прошли за секунду?


                          1. Riim
                            24.10.2018 21:35

                            В общем, самый долгий запрос из пачки плюс задержка. Если самый долгий 1 сек. и задержка 1 сек., то 2 секунды.


                            1. HaMI Автор
                              24.10.2018 22:03

                              ок, смотрите – мы ушли в некоторые интересные дебри. С моей точки зрения – эта стратегия далеко не оптимальна. вам интересно продлжать этот разговор? я спрашиваю чтобы у вас не сложилось впечатление что я к вам «прикапываюсь»


                              1. Riim
                                24.10.2018 22:16

                                Вы считаете, что складывать задержку и самый долгий запрос не нужно, а просто использовать самое долгое из них? Да, так оптимальнее. Доработать пример до этого минутное дело.


                                1. HaMI Автор
                                  24.10.2018 22:44

                                  Я считаю что в каждое окно времени у вас должно быть пять «активных» запросов, тоесть не нужно дожидаться окончания самого длинного запроса, если началось новое «окно»
                                  вот сервер с реальным тротлингом, там же код batchCreate и логи для сервера и create

                                  Вот как выглядит кусочек лога(это нет тоже лог что в гисте!)
                                  app finished 3 +53ms
                                  app finished 2 +572ms
                                  app finished 1 +11ms
                                  app finished 0 +1s
                                  app start 5 +1ms
                                  app start 6 +1ms
                                  app start 7 +0ms
                                  app start 8 +0ms
                                  app finished 4 +76ms
                                  app start 9 +0ms < — свободное окно и пошел вызов, не дожидаясь окончания всех долгоиграющих запросов
                                  app finished 6 +188ms
                                  app finished 8 +1s
                                  app finished 5 +74ms
                                  app finished 9 +244ms
                                  app finished 7 +121ms

                                  понимайте о чем я? сможете так доизменить свой код? прошу заметить в коде batchCreate измененно две строчки(имею ввиду по сути, а не добавлен логгинг)


                                  1. Riim
                                    25.10.2018 00:13

                                    понимайте о чем я?

                                    понимаю, так ещё оптимальнее, но не вижу проблемы написать и это без потоков, навскидку, мне кажется, даже кода меньше получится (чем сейчас в примере выше). Только давайте по очереди? Мне просто не особо интересно такие задачки решать. Попробуйте сами, а я уже попробую сделать ещё проще.


                                    1. HaMI Автор
                                      25.10.2018 00:36

                                      я уже сделал. пример по ссылке именно так работает – просто в следствии рандомного времени задержки, это не всегда видно. теперь интересно посмотреть на ваш код


                                      1. Riim
                                        25.10.2018 01:05

                                        Вы использовали готовый метод сделанный именно под вашу задачу, на это не нужно много времени и умения, я же предлагаю вам решить эту интереснейшею задачу на чистом js. Мне кажется будет честно, если мы будем тратить одинаковое время впустую, а иначе вы начнёте перечислять другие возможности highland или конкретно этого метода, а мне сидеть и переписывать всё это на чистом js? Нет уж), либо признайте, что потоки в данном примере не обязательны, либо давайте по очереди.


                                        1. HaMI Автор
                                          25.10.2018 01:18

                                          извините, но по-моему, это ваше утвержение

                                          но не вижу проблемы написать и это без потоков, навскидку, мне кажется, даже кода меньше получится

                                          Я как раз тут проблему вижу. По-этому, если вам нечего больше сказать – давайте не будем занимать время друг-друга. Спасибо


                                          1. Riim
                                            25.10.2018 02:31

                                            всё правильно, это мои слова, для меня это действительно не сложная задача, но я же не говорил, что хочу потратить ещё пол часа своего времени.


                                            Я как раз тут проблему вижу

                                            в чём проблема, в задаче? Вы действительно считаете её нерешаемой без потоков, серьёзно? Что там в потоках по вашему, божественный эфир? Или проблема в вас? Тогда это тем более в ваших интересах, потренируетесь. А то на собеседованиях уже страшно просить что-то простейшее написать.


                                            1. HaMI Автор
                                              25.10.2018 03:37

                                              Извините, вы похоже вместо кода начали генерить холивар. Доброй ночи, не растраивайтесь, конечно можете


                                              1. Riim
                                                25.10.2018 21:20

                                                Код я генерирую обычно за ЗП, здесь я вам ничего не должен, а холивар здесь только от вашего нежелания признать очевидное.


        1. HaMI Автор
          24.10.2018 20:43

          и это на самом деле очень круто, что вы это написали. Вы согласны с тем что эта задача не настолько тривиальна, как кажется на первый взгляд?
          Завернуть всё это в красивый интерфейс – давайте исходить из того что в красивый интерефейс можно что угодно завернуть


          1. Riim
            24.10.2018 20:56

            Вы согласны с тем что эта задача не настолько тривиальна, как кажется на первый взгляд?

            честно, нет, для меня сложность сразу ясна.


            давайте исходить из того что в красивый интерефейс можно что угодно завернуть

            вы же не хотите сказать, что внутри highland что-то более простое чем я предложил?


            1. HaMI Автор
              24.10.2018 21:05

              нет, конечно. Но Highland(Rx, lodash, orm, whatever) – позволяет решать много задач. вы предлагаете написать штуку которая решает одну задачу и завернуть ее в красивый интерфейс.


              1. Riim
                24.10.2018 21:26

                вы предлагаете написать штуку которая решает одну задачу

                почему нет?


                Хотя вопрос риторический, мне не нужно объяснять зачем объединять что-то в библиотеки, речь не про это, я говорю, что приведённый вами пример в статье никак не показывает плюсов реактивного программирования, а его ключевая часть легко переносится в другие библиотеки никак не связанные с РП. Если бы мне так объясняли РП я бы сказал "круто", но никогда бы не стал его использовать для себя.


                1. HaMI Автор
                  24.10.2018 22:09

                  буду рад если вы укажите ссылки на лучшие примеры. Большой курсеровский курс прошу не предлагать – в силу того что в он напорядок больше размера данной статьи


                1. HaMI Автор
                  24.10.2018 22:55

                  если честно, я не совсем понимаю что тут еще показывать
                  в вашем коде навскидку 70 cтрок
                  в моем 10
                  это если еще опустить факт того что там есть пассажи вида:
                  for (;;) {
                  if (this.pending == this.limit || !this.jobs.length) {
                  которые нужно вдумчиво читать

                  С Аргументом «вы же не хотите сказать, что внутри highland что-то более простое чем я предложил?» я не знаю как спорить – ним можно отбросить любую парадигму программирования и заменить ее на на набор функций


                  1. Riim
                    25.10.2018 00:35

                    в вашем коде навскидку 70 cтрок
                    в моем 10

                    причём тут количество строк? Ок, представьте другую библиотеку внутри которой ratelimit на основе моего кода. Представьте в lodash завтра появляется ratelimit реализованный без потоков. Как теперь докажете необходимость РП?


                    буду рад если вы укажите ссылки на лучшие примеры

                    я делал несколько докладов про РП и как-то начинал писать статью на эту тему. И каждый раз основной проблемой был именно хороший пример, глядя на который сразу становилось бы ясно, что это и зачем. Фишка РП в том, что плюсы становятся особенно видны именно в сложных ситуациях, а такое в статью или доклад не запихаешь. Мне нечего вам предложить в виде одного примера, но я похоже в скором времени ещё раз попробую написать такую статью. Может со второй попытки что-то получится.


                    1. HaMI Автор
                      25.10.2018 00:39

                      >Как теперь докажете необходимость РП?
                      исходя из вашей логики, можно отменить «все» аргументируя тем, что это можно на машином коде написать.

                      Эта ветка ушла в холивар – это не интересно. предлагаю закончить. продолжить там где код


                      1. Riim
                        25.10.2018 01:12

                        можно отменить «все»

                        я как раз не предлагаю отменять никаких абстракций, я предлагаю представить, что такой метод есть ещё в какой-то библиотеке, но реализован без потоков. Выше мы как раз обсуждаем насколько возможно сделать его без потоков.


  1. alexandersh123
    25.10.2018 01:02

    Вопрос «в лоб»: а как при таком подходе контролировать ratelimit в условиях, когда сервис api одновременно запущен в 2..n экземплярах? Отказ от масштабируемости?)


    1. HaMI Автор
      25.10.2018 01:16

      throtlling в простейшем случае выполняется по апи — вам это в принципе не нужно.
      В более сложном – вам не обойтись без какой-то мастер ноды которая будет раздавать джобы слейвам, но это выходит за рамки этой статьи.


      1. HaMI Автор
        25.10.2018 03:44

        в комментарии выше: вместо апи — айпи.


  1. faiwer
    25.10.2018 10:07
    +1

    Мне кажется в статье было куда больше смысла, если бы:


    .ratelimit(5, 1000)

    был написан "руками" с пояснениями. Тогда это была бы демонстрация "на пальцах", как можно удобно работать со stream-ми асинхронных данных. Вместо этого мы видим статью, которая сводится к "в highland для этого есть готовый удобный метод". Зачем? :)


    И мне кажется в статье под названием "Самое краткое введение в Reactive Programming" должно быть простейшее описание паттерна Observer. А не реактивные библиотечные потоки с тыщей мутных методов :)


    1. HaMI Автор
      25.10.2018 12:09

      >простейшее описание паттерна Observer
      мне так не кажется. Проще думать об этом как именно о потоках – отсюда «качаю», сюда «заливаю». Потом человек уже сам почитает про все остальное. Оставить статью краткой и рассказать о всем – заведомо не достижимая цель.


      1. faiwer
        25.10.2018 22:13
        +1

        Ну вот смотрите. Приходит человек читать про реактивное программирование. Он не знает что такое stream-ы и протокола их работы. Он не знает где эти странные штуки могут быть удобными. Он не знает паттерна наблюдатель. Он весь во внимании. Ведь это краткое введение, ему сейчас всё объяснят. Что в итоге он видит в этой статье? Хм… Ну что есть какие-то сторонние библиотеки, которые позволяют записать привычные вещи в странном виде. Он хочет пояснений — а зачем так? Что тут под капотом происходит? А вы ему пример с rateLimit. Он думает — вот, интересный пример, сейчас мне всё расскажут и покажут. Вы говорите ему, мол есть готовый метод в 1 строку. Почувствуй силу rxjshighland, о юный падаван! Он в ответ — эээээ, что? На этом статья заканчивается. Что нового для себя подчерпнул человек? В чём был "message"? :) Смотрите, как я могу?


        Даже зная как работают nodejs-streams, observer-ы и пр. штуки, может быть совершенно не очевидно, в каких реальных жизненных обстоятельствах, эти непривычные подходы могут оказаться к месту. Тут ведь как с каррированием. Раз в неделю кто-нибудь пишет статью про то, что это такое. Однако людей, которые нашли ему применение в своих реальных задачах можно пересчитать по пальцам одной руки (ладно, тут я несколько утрирую).