Привет, Хабр!
Сегодня поговорим о хорошей библиотеке для управления потоками данных в Python – RxPY — реализации Reactive Extensions для нашего любимого языка. В версии 4.0.4 эта библиотека получила ряд улучшений, и сегодня мы разберем её основной функционал.
Основы RxPY
RxPY — это библиотека, реализующая принципы функционального реактивного программирования в Python. Она позволяет создавать и управлять асинхронными потоками данных, объединяя их, фильтруя и трансформируя с помощью цепочек операторов. Основные компоненты RxPY:
Observable: источник данных, который может выдавать события.
Observer: подписчик, который реагирует на события от Observable.
Операторы: функции, позволяющие трансформировать, фильтровать и комбинировать Observable.
Установить RxPY проще простого:
pip install reactivex
В версии 4.0.4 RxPY произошли значительные изменения:
Переименование модуля: теперь импортируем
reactivex
, а неrx
.Улучшенная типизация: добавлены аннотации типов для лучшей поддержки IDE.
Обновленная работа с операторами: использование метода
pipe
для цепочки операторов.
Если ты работал с RxPY v3, вот как все это дело интегрировать:
Изменение импорта:
import rx
->import reactivex as rx
.Операторы: вместо
observable.map()
теперь используемobservable.pipe(ops.map())
.Удалены устаревшие функции: некоторые старые операторы и методы были удалены или переименованы.
Создание Observable и работа с операторами
Создание Observable
just()
Создает Observable, который выдает единственное значение.
import reactivex as rx
observable = rx.just(42)
observable.subscribe(lambda x: print(f"Значение: {x}"))
from_()
Преобразует итерабельный объект в Observable.
observable = rx.from_([1, 2, 3, 4, 5])
observable.subscribe(lambda x: print(f"Элемент: {x}"))
interval()
Выдает последовательность чисел с заданным интервалом времени.
import time
from reactivex import interval
observable = interval(1) # каждую секунду
subscription = observable.subscribe(lambda x: print(f"Тик: {x}"))
time.sleep(5)
subscription.dispose()
timer()
Выдает значение после заданной задержки.
from reactivex import timer
observable = timer(3) # через 3 секунды
observable.subscribe(lambda x: print("Таймер сработал!"))
Трансформация Observable
map()
Применяет функцию к каждому элементу.
from reactivex import operators as ops
observable.pipe(
ops.map(lambda x: x * x)
).subscribe(lambda x: print(f"Квадрат: {x}"))
flat_map()
Разворачивает вложенные Observable.
def duplicate(x):
return rx.from_([x, x*2, x*3])
observable.pipe(
ops.flat_map(duplicate)
).subscribe(lambda x: print(f"Значение: {x}"))
scan()
Аналог reduce
, но выдает накопленный результат на каждой итерации.
observable.pipe(
ops.scan(lambda acc, x: acc + x, seed=0)
).subscribe(lambda x: print(f"Сумма: {x}"))
Фильтрация данных
filter()
Отбирает элементы, удовлетворяющие условию.
observable.pipe(
ops.filter(lambda x: x % 2 == 0)
).subscribe(lambda x: print(f"Четное число: {x}"))
debounce()
Игнорирует значения, если они поступают слишком быстро.
observable.pipe(
ops.debounce(0.5)
).subscribe(lambda x: print(f"Получено: {x}"))
distinct()
Пропускает только уникальные значения.
observable.pipe(
ops.distinct()
).subscribe(lambda x: print(f"Уникальное значение: {x}"))
Комбинирование Observable
merge()
Объединяет несколько Observable в один поток.
obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_([4, 5, 6])
rx.merge(obs1, obs2).subscribe(lambda x: print(f"Элемент: {x}"))
zip()
Объединяет элементы нескольких Observable в кортежи.
obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_(['a', 'b', 'c'])
rx.zip(obs1, obs2).subscribe(lambda x: print(f"Сочетание: {x}"))
combine_latest()
Выдает комбинацию последних элементов из каждого Observable.
obs1 = rx.interval(1)
obs2 = rx.interval(1.5)
rx.combine_latest(obs1, obs2).subscribe(lambda x: print(f"Комбинация: {x}"))
Тестирование потоков данных
Существуют горячие и холодные Observable
Холодные Observable начинают выдавать данные с момента подписки.
Горячие Observable уже генерируют данные, независимо от подписчиков.
Пример холодного Observable:
def create_cold_observable(scheduler):
return rx.from_([1, 2, 3], scheduler=scheduler)
scheduler = reactivex.testing.TestScheduler()
observable = create_cold_observable(scheduler)
Пример горячего Observable:
def create_hot_observable(scheduler):
return scheduler.create_hot_observable(
reactivex.testing.ReactiveTest.on_next(150, 1),
reactivex.testing.ReactiveTest.on_next(210, 2),
)
scheduler = reactivex.testing.TestScheduler()
observable = create_hot_observable(scheduler)
Использование TestScheduler:
from reactivex.testing import TestScheduler, ReactiveTest
def test_map_operator():
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
ReactiveTest.on_next(150, 1),
ReactiveTest.on_next(210, 2),
ReactiveTest.on_completed(300)
)
def create():
return xs.pipe(ops.map(lambda x: x * 10))
results = scheduler.start(create)
assert results.messages == [
ReactiveTest.on_next(210, 20),
ReactiveTest.on_completed(300)
]
Тестирование с использованием Marbles
Marble-диаграммы позволяют визуализировать потоки данных.
from reactivex.testing import marbles_testing
def test_filter_operator():
with marbles_testing() as (start, cold, hot, exp):
source = cold('--1-2-3-4-5-|')
expected = exp('----2---4---|')
result = start(source.pipe(
ops.filter(lambda x: int(x) % 2 == 0)
))
assert result == expected
Пару примеров применения RxPY
Интеграция с asyncio
RxPY хорошо сочетается с asyncio
:
import asyncio
async def main():
loop = asyncio.get_event_loop()
observable = rx.interval(1).pipe(
ops.take(5)
)
observable.subscribe(
on_next=lambda x: print(f"Tick: {x}"),
on_error=lambda e: print(f"Error: {e}"),
on_completed=lambda: print("Completed"),
scheduler=rx.scheduler.AsyncIOScheduler(loop)
)
await asyncio.sleep(6)
asyncio.run(main())
Также RxPY может помочь при работе с очередями сообщений и кэшем в Redis:
import redis
from reactivex import Subject
r = redis.Redis()
def listen_to_channel(channel):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
yield message['data']
channel_observable = rx.from_(listen_to_channel('my_channel'))
channel_observable.subscribe(lambda msg: print(f"Received: {msg}"))
Обработка событий в event-driven архитектуре:
event_subject = Subject()
def handle_event(event):
print(f"Handling event: {event}")
event_subject.pipe(
ops.filter(lambda e: e['type'] == 'click'),
ops.map(lambda e: e['payload'])
).subscribe(handle_event)
# Где-то в коде
event_subject.on_next({'type': 'click', 'payload': {'x': 100, 'y': 200}})
event_subject.on_next({'type': 'hover', 'payload': {'x': 150, 'y': 250}})
Заключение
RxPY — это отличная находка для тех, кто хочет управлять асинхронными потоками данных по новому. Моя рекомендация: используй RxPY, если работаешь с большими объемами асинхронных данных или строишь event-driven системы. В таких проектах она раскроет весь свой потенциал.
У RxPY есть и свой порог вхождения. Если тебе нужно решить простую задачу с минимальным уровнем асинхронности, возможно, дефолтные библиотеки asyncio
или потоки будут проще и быстрее в освоении. Но когда дело доходит до сложных и динамических систем — RxPY может уже понадобиться.
Подробнее с RxPY можете ознакомиться в их гите.
В завершение напоминаем про открытые уроки, которые пройдут в октябре в рамках курса "Microservice Architecture":
23 октября. Метрики и Prometheus: обсудим, как собирать и использовать метрики с помощью Prometheus в Kubernetes для мониторинга приложений. Запись по ссылке
24 октября. Брокеры сообщений: RabbitMQ и Kafka — узнаете, как использовать RabbitMQ и Kafka для организации асинхронной связи между микросервисами. Запись по ссылке