Так исторически сложилось, что последние 5 лет своей продуктовой разработки я работаю с микросервисами вокруг брокеров сообщений (преимущественно RabbitMQ и Kafka).
И все это время меня не покидало чувство некой неудовлетворенности и неполноценности инструментария, который был мне доступен.
Приходя из мира HTTP фреймворков, ты чувствуешь себя как будто на костылях - ни тебе hotreload'а, который есть практически в любом wsgi-asgi сервере, хочешь тестировать - поднимай контейнеры окружения или мокай зависимости (особенно удобно в CI, ага), не забудь о реконнектах, логировании, трассировке и тд и тп.
И вот, таская от сервиса к сервису ворох всех этих проблем (и код который эти проблемы решает), до меня дошла гениальная идея: оформить весь однотипный код, общий для всех сервисов в единый пакет!
Так появился на свет фреймворк Propan.
Зачем вам использовать Propan
Фреймворк слеплен по образу и подобию FastAPI, но для работы с брокерами сообщений и с учетом лично моих болей, которые возникали при работе с этим HTTP-фреймворком. Propan в большей степени открыт для расширения и не диктует вам, как именно его использовать.
От FastAPI мы имеем:
Валидацию и приведение типов входящих сообщений с помощью Pydantic
Систему внедрения зависимостей
Максимально простой и понятный всем и каждому способ написания приложения
Особенности, которые выделяют Propan из ряда других фреймворков и нативных библиотек для работы с брокерами сообщений:
Независимость от брокеров - код, который вы пишете, не зависит от используемого брокера. Вы можете легко мигрировать с RabbitMQ на Kafka или Nats при росте нагрузки.
RPC поверх MQ - вам не нужно думать, как превратить Messaging в RPC. Если запрос ожидает ответ - фреймворк сделает все за вас.
Тестируемость - фреймворк позволяет эмулировать поведение брокеров и тестировать ваши сервисы без необходимости подключения к внешним зависимостям.
Собственный CLI - позволяет управлять настройками приложения, количеством запущенных инстансов, генерирует шаблоны проекта, а также перезагружает ваш проект при изменениях в коде (мне этого очень не хаватало при локальной разработке).
На текущий момент фрейморк поддерживает работу с RabbitMQ, Redis Pub/Sub, Nats. Поддержка Kafka ожидается в ближайший месяц, затем - работа над генерацией схемы в соответсвии с AsyncAPI.
Пример
Давайте немного опустим все эти сухие слова и перейдем к практическому примеру.
Например, вот код "приложения" для работы с RabbitMQ с использование aio-pika
import asyncio
import aio_pika
async def main():
connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue("test_queue")
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
asyncio.run(main())
А вот, тот же пример с использование Propan
from propan import PropanApp, RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)
@broker.handle("test_queue")
async def base_handler(body: str): # тело сообщения будет приведено к `str`
print(body)
В нем вам не нужно самостоятельно объявлять все подключения, очереди, заниматься обработкой сообщения: вы просто пишете код, а фреймворк делает все за вас. Однако, при необходимости, у вас остается возможность управлять этим поведением вручную.
При этом, пример с использованием Propan дает вам все преимущества, описанные в предыдущем разделе.
Пример с использованием Redis или Nats идентичен с точностью до аргументов функций.
from propan import PropanApp
from propan import RabbitBroker
# from propan import RedisBroker
# from propan import NatsBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
# broker = RedisBroker("redis://localhost:6379")
app = PropanApp(broker)
@broker.handle("test")
async def base_handler(body: str):
print(body)
Использование в HTTP сервисах
Очень часто нам нужно использовать логику работы с брокерами сообщений в рамках приложений, которые обрабатывает и HTTP-запросы.
Без проблем! Просто запустите и остановите Propan вместе с вашим приложением.
Пример с aiohttp
from aiohttp import web
from propan import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
@broker.handle("test")
async def base_handler(body):
print(body)
async def start_broker(app):
await broker.start()
async def stop_broker(app):
await broker.close()
app = web.Application()
app.on_startup.append(start_broker) # запускаем Propan при старте
app.on_cleanup.append(stop_broker) # и останавливаем вместе с приложением
web.run_app(app)
Кроме этого, если вы используете FastAPI, вы можете использовать Propan напрямую - как часть вашего приложения FastAPI.
from fastapi import FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter
app = FastAPI()
router = RabbitRouter("amqp://guest:guest@localhost:5672")
class Incoming(BaseModel):
...
@router.event("test")
async def hello(body: Incoming):
print(body)
app.include_router(router)
Заключение
Не вижу смысла пересказывать всю документацию фреймворка, вы можете найти ее здесь. Если по ходу чтения статьи у вас возникли какие-то вопросы, возможно, ответы вы также найдете там.
Сейчас фреймворк активно развивается только моими силами. Если вы заинтересованы в его дальнейшем развитии, я буду крайне благодарен за любую помощь: указание на неточности/непонятные моменты в документации, написание кода, тестирование.
В любом случае, я буду рад любому фидбеку: критике, предложениям для реализации. Если вы решите использовать фреймворк в своих проектах - буду рад услышать о вашем опыте.
Комментарии (17)
asarkhipov
25.05.2023 11:33Давно хотел поработать над чем-то опенсорсным, проект выглядит перспективно. Если найду свободное время - может чем-то помогу.
Сейчас посмотрел глазом на код кафки, вот эта конструкция может генерировать исключенияasync for msg in handler.consumer
Я по этой причине используюgetone()
Какие планы по интеграции MQTT?Propan671 Автор
25.05.2023 11:33Буду рад, если вы присоединитесь к работе над проектом: мне пригодилась бы помощь с той же кафкой. Одному рук катастрофически не хватает.
Реализацию MQTT я вижу пока поверх paho. С этим не должно быть много проблем, но сейчас это не приоритетная задача. Пока приоритет следующий: реализация SQS, добить Kafka, NatsJS, Redis Streams, затем - документация AsyncAPI. Задач еще много...
pomponchik
25.05.2023 11:33А как быть, если сообщения не в формате json?
Propan671 Автор
25.05.2023 11:33Оно и не обязано быть json: можно послать все примитивные типы python, а сериализовать их при приеме с помощью pydantic. На крайний случай - можно принимать сырые
bytes
и сериализовать их самостотельно: например, вDepends
, или прямо в теле функции.pomponchik
25.05.2023 11:33+1То есть pydantic умеет работать, к примеру, с protobuf? Не слышал про такое.
Propan671 Автор
25.05.2023 11:33Ну, если речь о protobuf, то нет, не умеет. В перспективе - будет. Но вопрос был о json. Нет, можно посылать не только json. И нет, protobuf не умеем. Хотя, технически, вы можете легко его интегрировать в свой проект самостоятельно способом, который я описал выше.
pomponchik
25.05.2023 11:33А как еще могут сериализоваться примитивные типы с помощью pydantic, кроме как через json?
Propan671 Автор
25.05.2023 11:33Из строки, например. Сообщение же может быть просто строкой/числом/bool, etc.
pomponchik
25.05.2023 11:33Допустим, в сообщении число. Как оно будет сериализовано?
Propan671 Автор
25.05.2023 11:33Отправится как строка, на входе будет преобразовано в int в соответствии с аннотацией функции - потребителя
pomponchik
25.05.2023 11:33А чем это отличается от json?
Propan671 Автор
25.05.2023 11:33Тем, что это уже не является форматом JSON (согласно его спецификации)
Если честно, не вижу смысла развивать этот диалог в рамках статьи
Propan671 Автор
25.05.2023 11:33Хорошо, правда за вами: RFC 8259 утверждает, что просто значения тоже могут быть JSON, хотя и ссылается на то, что "в предыдущих стандартах JSON опеределялся только либо как object, либо как array"
Видимо, я почему-то руководствовался всю жизнь однажды прочитанным старым стандартом 2013 года.
kield
Я не знаю сюда ли я пишу, но на сколько я смог понять из документации aio-pika, когда мы используем
channel.set_qos(prefetch_count=<int>)
то мы задаём количество операций, которое за раз может взять на себя consumer. Но я нашел у вас в коде это (propan/brokers/rabbit/rabbit_broker.py:69
):max_consumers, это переменная которая отвечает за количество инстансов consumer'ов которые будут крутиться (судя по названию). В таком случае получается что каждый из них будет брать столько задач, сколько всего instance'ов. Просто я например ставлю это значение на 50-100, зная что у меня все задачи IO-bound, и в таком случае всё ок работает. Поправьте если я ошибаюсь.
Propan671 Автор
Добрый день! В целом, вы правы: prefetch_count в aio-pika говорит о том, сколько сообщений за раз может обрабатывать consumer. Этакий MessagePool.
Просто для меня, когда я первый раз взялся за работу с RabbitMQ смысловое значение этого аргумента вызвало некоторый ступор и необходимость лезть в документацию самого Rabbit'а. Поэтому в рамках Propan я немного пересмотрел эту концепцию: в рамках приложения в качестве потребителя рассматривается 1 задача обработки одного конкретного сообщения. Поэтому max_consumers говорит о том, что у нас будет обрабатываться не более N сообщений одновременно: тот же MessagePool, вид сбоку.
Мне кажется, что в таком ключе эта концепция легче укладывается в головах у джунов, которые первый раз видят RabbitMQ. Впрочем, я готов изменить свое мнение и переименовать данный параметр.
Количество инстансов в Propan - это workers, которые мы запускаем из CLI.