В последние годы реактивное программирование в целом, а технология ReactiveX в частности, обретает всё большую популярность среди разработчиков. Одни уже активно используют все преимущества этого подхода, а другие только “что-то слышали”. Со своей стороны я постараюсь помочь вам представить, насколько некоторые концепции реактивного программирования способны изменить взгляд на привычные, казалось бы, вещи.

Существует два принципиально различных способа организации больших систем: в соответствии с объектами и состояниями, которые живут в системе, и в соответствии с потоками данных, которые проходят через неё. Парадигма реактивного программирования предполагает легкость выражения потоков данных, а также распространение изменений благодаря этим потокам. Например, в императивном программировании операция присваивания означает конечность результата, тогда как в реактивном значение будет пересчитано при получении новых входных данных. Поток значений проходит в системе ряд трансформаций, которые необходимы для решения определенной задачи. Оперирование потоками позволяет системе быть расширяемой и асинхронной, а правильная реакция на возникающие ошибки – отказоустойчивой.

ReactiveX – библиотека, позволяющая создавать асинхронные и событийно-ориентированные программы, использующие наблюдаемые последовательности. Она расширяет шаблон Наблюдателя для поддержки последовательностей данных, добавляет операторы для их декларативного соединения, избавляя от необходимости заботиться о синхронизации и безопасности потоков, разделяемых структурах данных и неблокирующего I/O.

Одним из основных отличий библиотеки ReactiveX от функционального реактивного программирования является то, что она оперирует не непрерывно изменяющимися, а дискретными значениями, которые “испускаются” в течении длительного времени.

Стоит немного рассказать о том, что такое Observer, Observable, Subject. Модель Observable является источником данных и позволяет обрабатывать потоки асинхронных событий похожим образом с тем, который вы используете для коллекций данных, таких как массивы. И всё это вместо колбэков, а значит, код является более читабельным и менее склонным к ошибкам.

В ReactiveX наблюдатель (Observer) подписывается на Observable и впоследствии реагирует на элемент или последовательность элементов, которые тот отправляет. У каждого Observer, подписанного на Observable, вызывается метод Observer.on_next() на каждый элемент потока данных, после которого может быть вызван как Observer.on_complete(), так и Observer.on_error(). Часто Observable применяется таким образом, что он не начинает отдавать данные до тех пор, пока кто-нибудь не подписывается на него. Это так называемые “ленивые вычисления” – значения вычисляются только тогда, когда в них возникает потребность.

observer

Бывают задачи, для решения которых нужно соединить Observer и Observable, чтобы принимать сообщения о событиях и сообщать о них своим подписчикам. Для этого существует Subject, имеющий, кроме стандартной, ещё несколько реализаций:

  • ReplaySubject имеет возможность кэшировать все поступившие в него данные, а при появлении нового подписчика – отдавать всю эту последовательность сначала, работая далее в обычном режиме.

  • BehaviorSubject хранит последнее значение, по аналогии с ReplaySubject отдавая его появившемуся подписчику. При создании он получает значение по умолчанию, которое будет получать каждый новый подписчик, если последнего значения еще не было.

  • AsyncSubject также хранит последнее значение, но не отдает данные, пока не завершится вся последовательность.

Observable и Observer – только начало ReactiveX. Они не несут в себе всю мощь, которую являют собой операторы, позволяющие трансформировать, объединять, манипулировать последовательностями элементов, которые отдают Observable.

В документации ReactiveX описание операторов включает в себя использование Marble Diagram. К примеру, вот как эти диаграммы представляют Observable и их трансформации:

observable

Глядя на диаграмму ниже, легко понять, что оператор map трансформирует элементы, отдаваемые Observable, путем применения функции к каждому из них.

map

Хорошей иллюстрацией возможностей ReactiveX является приложение RSS-агрегатора. Здесь возникает необходимость асинхронной загрузки данных, фильтрации и трансформации значений, поддержания актуального состояния путем периодического обновления.

В этой статье примеры для представления основных принципов ReactiveX написаны с использованием библиотеки rx для языка программирования Python. Вот так, например, выглядит абстрактная реализация наблюдателя:

class Observer(metaclass=ABCMeta):
    @abstractmethod
    def on_next(self, value):
        return NotImplemented

    @abstractmethod
    def on_error(self, error):
        return NotImplemented

    @abstractmethod
    def on_completed(self):
        return NotImplemented

Наше приложение в режиме реального времени будет обмениваться сообщениями с браузером посредством веб-сокетов. Возможность легко реализовать это предоставляет Tornado.

Работа программы начинается с запуска сервера. При обращении браузера к серверу открывается веб-сокет.

Код

import json
import os

import feedparser
from rx import config, Observable
from rx.subjects import Subject
from tornado.escape import json_decode
from tornado.httpclient import AsyncHTTPClient
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.websocket import WebSocketHandler

asyncio = config['asyncio']

class WSHandler(WebSocketHandler):
    urls = ['https://lenta.ru/rss/top7',
            'http://wsrss.bbc.co.uk/russian/index.xml']
    def open(self):
        print("WebSocket opened")
        # здесь будет основная логика нашего приложения

    def on_message(self, message):
        obj = json_decode(message)
        # Отправляет сообщение, которое получает user_input
        self.subject.on_next(obj['term'])

    def on_close(self):
        # Отписаться от Observable; по цепочке остановит работу всех observable
        self.combine_latest_sbs.dispose()
        print("WebSocket closed")

class MainHandler(RequestHandler):
    def get(self):
        self.render("index.html")

def main():
    AsyncIOMainLoop().install()

    port = os.environ.get("PORT", 8080)
    app = Application([
        url(r"/", MainHandler),
        (r'/ws', WSHandler),
        (r'/static/(.*)', StaticFileHandler, {'path': "."})
    ])
    print("Starting server at port: %s" % port)
    app.listen(port)
    asyncio.get_event_loop().run_forever()


Для обработки введенного пользователем запроса создается Subject, при подписке на который он отправляет значение по умолчанию (в нашем случае — пустую строку), а затем раз в секунду отправляет то, что введено пользователем и удовлетворяет условиям: длина 0 или больше 2, значение изменилось.

   # Subject одновременно и Observable, и Observer
        self.subject = Subject()
        user_input = self.subject.throttle_last(
            1000  # На заданном временном промежутке получать последнее значение
        ).start_with(
            ''  # Сразу же после подписки отправляет значение по умолчанию
        ).filter(
            lambda text: len(text) == 0 or len(text) > 2
        ).distinct_until_changed()  # Только если значение изменилось

Также для периодического обновления новостей предусмотрен Observable, который раз в 60с отдает значение.


 interval_obs = Observable.interval(
            60000  # Отдает значение раз в 60с (для периодического обновления)
        ).start_with(0)

Два этих потока соединяются оператором combine_latest, в цепочку встраивается Observable для получения списка новостей. После чего на этот Observable создается подписка, вся цепочка начинает работать только в этот момент.


        # combine_latest собирает 2 потока из запросов пользователя и временных
        # интервалов, срабатывает на любое сообщение из каждого потока
        self.combine_latest_sbs = user_input.combine_latest(
            interval_obs, lambda input_val, i: input_val
        ).do_action(  # Срабатывает на каждый выпущенный элемент
            # Отправляет сообщение для очистки списка на фронтенд
            lambda x: send_response('clear')
        ).flat_map(
            # В цепочку встраивается Observable для получения списка
            self.get_data
        ).subscribe(send_response, on_error)
        # Создается подписка; вся цепочка начинает работать только в этот момент

Следует подробнее остановиться на том, что такое “Observable для получения списка новостей”. Из списка url для получения новостей мы создаем поток данных, элементы которого приходят в функцию, где при помощи HTTP-клиента Tornado AsyncHTTPClient происходит асинхронная загрузка данных для каждого элемента списка urls. Из них также создается поток данных, который фильтруется по запросу, введенному пользователем. Из каждого потока мы берем по 5 новостей, которые приводим к нужному формату для отправки на фронтенд.

Код

    def get_rss(self, rss_url):
        http_client = AsyncHTTPClient()
        return http_client.fetch(rss_url, method='GET')

    def get_data(self, query):
        # Observable создается из списка url
        return Observable.from_list(
            self.urls
        ).flat_map(
            # Для каждого url создается Observable, который загружает данные
            lambda url: Observable.from_future(self.get_rss(url))
        ).flat_map(
            # Полученные данные парсятся, из них создается Observable
            lambda x: Observable.from_list(
                feedparser.parse(x.body)['entries']
            ).filter(
                # Фильтрует по вхождению запроса в заголовок или текст новости
                lambda val, i: query in val.title or query in val.summary
            ).take(5)  # Берем только по 5 новостей по каждому url
        ).map(lambda x: {'title': x.title, 'link': x.link,
                         'published': x.published, 'summary': x.summary})
        # Преобразует данные для отправки на фронтенд


После того, как поток выходных данных сформирован, его подписчик начинает поэлементно получать данные. Функция send_response отправляет полученные значения во фронтенд, который добавляет новость в список.


        def send_response(x):
            self.write_message(json.dumps(x))

        def on_error(ex):
            print(ex)

В файле feeder.js

Код
        ws.onmessage = function(msg) {
            var value = JSON.parse(msg.data);
            if (value === "clear") {$results.empty(); return;}

            // Append the results
            $('<li><a tabindex="-1" href="' + value.link +
                '">' + value.title +'</a> <p>' + value.published +
                '</p><p>' + value.summary + '</p></li>'
            ).appendTo($results);
            $results.show();
        }


Таким образом, реализуется push-технология, в которой данные поступают от сервера к фронтенду, который лишь отправляет введенный пользователем запрос для поиска по новостям.

В качестве заключения предлагаю задуматься о том, какая реализация получилась бы при привычном подходе с использованием колбэков вместо Observable, без возможности легко объединить потоки данных, без возможности мгновенной отправки данных потребителю-фронтенду и с необходимостью отслеживать изменения в строке запроса. Среди Python-разработчиков технология пока что практически не распространена, однако я вижу уже несколько возможностей её применить на текущих проектах.

Пример использования ReactiveX для Python вы можете найти в github репозитории с демо-проектом RSS-агрегатора.
Поделиться с друзьями
-->

Комментарии (0)