Привет, Хабр!

Сегодня поговорим о хорошей библиотеке для управления потоками данных в Python – RxPY — реализации Reactive Extensions для нашего любимого языка. В версии 4.0.4 эта библиотека получила ряд улучшений, и сегодня мы разберем её основной функционал.

Основы RxPY

RxPY — это библиотека, реализующая принципы функционального реактивного программирования в Python. Она позволяет создавать и управлять асинхронными потоками данных, объединяя их, фильтруя и трансформируя с помощью цепочек операторов. Основные компоненты RxPY:

  • Observable: источник данных, который может выдавать события.

  • Observer: подписчик, который реагирует на события от Observable.

  • Операторы: функции, позволяющие трансформировать, фильтровать и комбинировать Observable.

Установить RxPY проще простого:

pip install reactivex

В версии 4.0.4 RxPY произошли значительные изменения:

  • Переименование модуля: теперь импортируем reactivex, а не rx.

  • Улучшенная типизация: добавлены аннотации типов для лучшей поддержки IDE.

  • Обновленная работа с операторами: использование метода pipe для цепочки операторов.

Если ты работал с RxPY v3, вот как все это дело интегрировать:

  • Изменение импорта: import rx -> import reactivex as rx.

  • Операторы: вместо observable.map() теперь используем observable.pipe(ops.map()).

  • Удалены устаревшие функции: некоторые старые операторы и методы были удалены или переименованы.

Создание Observable и работа с операторами

Создание Observable

just()

Создает Observable, который выдает единственное значение.

import reactivex as rx

observable = rx.just(42)
observable.subscribe(lambda x: print(f"Значение: {x}"))

from_()

Преобразует итерабельный объект в Observable.

observable = rx.from_([1, 2, 3, 4, 5])
observable.subscribe(lambda x: print(f"Элемент: {x}"))

interval()

Выдает последовательность чисел с заданным интервалом времени.

import time
from reactivex import interval

observable = interval(1)  # каждую секунду
subscription = observable.subscribe(lambda x: print(f"Тик: {x}"))

time.sleep(5)
subscription.dispose()

timer()

Выдает значение после заданной задержки.

from reactivex import timer

observable = timer(3)  # через 3 секунды
observable.subscribe(lambda x: print("Таймер сработал!"))

Трансформация Observable

map()

Применяет функцию к каждому элементу.

from reactivex import operators as ops

observable.pipe(
    ops.map(lambda x: x * x)
).subscribe(lambda x: print(f"Квадрат: {x}"))

flat_map()

Разворачивает вложенные Observable.

def duplicate(x):
    return rx.from_([x, x*2, x*3])

observable.pipe(
    ops.flat_map(duplicate)
).subscribe(lambda x: print(f"Значение: {x}"))

scan()

Аналог reduce, но выдает накопленный результат на каждой итерации.

observable.pipe(
    ops.scan(lambda acc, x: acc + x, seed=0)
).subscribe(lambda x: print(f"Сумма: {x}"))

Фильтрация данных

filter()

Отбирает элементы, удовлетворяющие условию.

observable.pipe(
    ops.filter(lambda x: x % 2 == 0)
).subscribe(lambda x: print(f"Четное число: {x}"))

debounce()

Игнорирует значения, если они поступают слишком быстро.

observable.pipe(
    ops.debounce(0.5)
).subscribe(lambda x: print(f"Получено: {x}"))

distinct()

Пропускает только уникальные значения.

observable.pipe(
    ops.distinct()
).subscribe(lambda x: print(f"Уникальное значение: {x}"))

Комбинирование Observable

merge()

Объединяет несколько Observable в один поток.

obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_([4, 5, 6])

rx.merge(obs1, obs2).subscribe(lambda x: print(f"Элемент: {x}"))

zip()

Объединяет элементы нескольких Observable в кортежи.

obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_(['a', 'b', 'c'])

rx.zip(obs1, obs2).subscribe(lambda x: print(f"Сочетание: {x}"))

combine_latest()

Выдает комбинацию последних элементов из каждого Observable.

obs1 = rx.interval(1)
obs2 = rx.interval(1.5)

rx.combine_latest(obs1, obs2).subscribe(lambda x: print(f"Комбинация: {x}"))

Тестирование потоков данных

Существуют горячие и холодные Observable

  • Холодные Observable начинают выдавать данные с момента подписки.

  • Горячие Observable уже генерируют данные, независимо от подписчиков.

Пример холодного Observable:

def create_cold_observable(scheduler):
    return rx.from_([1, 2, 3], scheduler=scheduler)

scheduler = reactivex.testing.TestScheduler()
observable = create_cold_observable(scheduler)

Пример горячего Observable:

def create_hot_observable(scheduler):
    return scheduler.create_hot_observable(
        reactivex.testing.ReactiveTest.on_next(150, 1),
        reactivex.testing.ReactiveTest.on_next(210, 2),
    )

scheduler = reactivex.testing.TestScheduler()
observable = create_hot_observable(scheduler)

Использование TestScheduler:

from reactivex.testing import TestScheduler, ReactiveTest

def test_map_operator():
    scheduler = TestScheduler()
    xs = scheduler.create_hot_observable(
        ReactiveTest.on_next(150, 1),
        ReactiveTest.on_next(210, 2),
        ReactiveTest.on_completed(300)
    )

    def create():
        return xs.pipe(ops.map(lambda x: x * 10))

    results = scheduler.start(create)
    assert results.messages == [
        ReactiveTest.on_next(210, 20),
        ReactiveTest.on_completed(300)
    ]

Тестирование с использованием Marbles

Marble-диаграммы позволяют визуализировать потоки данных.

from reactivex.testing import marbles_testing

def test_filter_operator():
    with marbles_testing() as (start, cold, hot, exp):
        source = cold('--1-2-3-4-5-|')
        expected = exp('----2---4---|')

        result = start(source.pipe(
            ops.filter(lambda x: int(x) % 2 == 0)
        ))

        assert result == expected

Пару примеров применения RxPY

Интеграция с asyncio

RxPY хорошо сочетается с asyncio:

import asyncio

async def main():
    loop = asyncio.get_event_loop()
    observable = rx.interval(1).pipe(
        ops.take(5)
    )

    observable.subscribe(
        on_next=lambda x: print(f"Tick: {x}"),
        on_error=lambda e: print(f"Error: {e}"),
        on_completed=lambda: print("Completed"),
        scheduler=rx.scheduler.AsyncIOScheduler(loop)
    )

    await asyncio.sleep(6)

asyncio.run(main())

Также RxPY может помочь при работе с очередями сообщений и кэшем в Redis:

import redis
from reactivex import Subject

r = redis.Redis()

def listen_to_channel(channel):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            yield message['data']

channel_observable = rx.from_(listen_to_channel('my_channel'))

channel_observable.subscribe(lambda msg: print(f"Received: {msg}"))

Обработка событий в event-driven архитектуре:

event_subject = Subject()

def handle_event(event):
    print(f"Handling event: {event}")

event_subject.pipe(
    ops.filter(lambda e: e['type'] == 'click'),
    ops.map(lambda e: e['payload'])
).subscribe(handle_event)

# Где-то в коде
event_subject.on_next({'type': 'click', 'payload': {'x': 100, 'y': 200}})
event_subject.on_next({'type': 'hover', 'payload': {'x': 150, 'y': 250}})

Заключение

RxPY — это отличная находка для тех, кто хочет управлять асинхронными потоками данных по новому. Моя рекомендация: используй RxPY, если работаешь с большими объемами асинхронных данных или строишь event-driven системы. В таких проектах она раскроет весь свой потенциал.

У RxPY есть и свой порог вхождения. Если тебе нужно решить простую задачу с минимальным уровнем асинхронности, возможно, дефолтные библиотеки asyncio или потоки будут проще и быстрее в освоении. Но когда дело доходит до сложных и динамических систем — RxPY может уже понадобиться.

Подробнее с RxPY можете ознакомиться в их гите.


В завершение напоминаем про открытые уроки, которые пройдут в октябре в рамках курса "Microservice Architecture":

  • 23 октября. Метрики и Prometheus: обсудим, как собирать и использовать метрики с помощью Prometheus в Kubernetes для мониторинга приложений. Запись по ссылке

  • 24 октября. Брокеры сообщений: RabbitMQ и Kafka — узнаете, как использовать RabbitMQ и Kafka для организации асинхронной связи между микросервисами. Запись по ссылке

Комментарии (0)