Так исторически сложилось, что последние 5 лет своей продуктовой разработки я работаю с микросервисами вокруг брокеров сообщений (преимущественно RabbitMQ и Kafka).

И все это время меня не покидало чувство некой неудовлетворенности и неполноценности инструментария, который был мне доступен.

Приходя из мира HTTP фреймворков, ты чувствуешь себя как будто на костылях - ни тебе hotreload'а, который есть практически в любом wsgi-asgi сервере, хочешь тестировать - поднимай контейнеры окружения или мокай зависимости (особенно удобно в CI, ага), не забудь о реконнектах, логировании, трассировке и тд и тп.

И вот, таская от сервиса к сервису ворох всех этих проблем (и код который эти проблемы решает), до меня дошла гениальная идея: оформить весь однотипный код, общий для всех сервисов в единый пакет!

Так появился на свет фреймворк Propan.

Зачем вам использовать Propan

Фреймворк слеплен по образу и подобию FastAPI, но для работы с брокерами сообщений и с учетом лично моих болей, которые возникали при работе с этим HTTP-фреймворком. Propan в большей степени открыт для расширения и не диктует вам, как именно его использовать.

От FastAPI мы имеем:

  • Валидацию и приведение типов входящих сообщений с помощью Pydantic

  • Систему внедрения зависимостей

  • Максимально простой и понятный всем и каждому способ написания приложения

Особенности, которые выделяют Propan из ряда других фреймворков и нативных библиотек для работы с брокерами сообщений:

  • Независимость от брокеров - код, который вы пишете, не зависит от используемого брокера. Вы можете легко мигрировать с RabbitMQ на Kafka или Nats при росте нагрузки.

  • RPC поверх MQ - вам не нужно думать, как превратить Messaging в RPC. Если запрос ожидает ответ - фреймворк сделает все за вас.

  • Тестируемость - фреймворк позволяет эмулировать поведение брокеров и тестировать ваши сервисы без необходимости подключения к внешним зависимостям.

  • Собственный CLI - позволяет управлять настройками приложения, количеством запущенных инстансов, генерирует шаблоны проекта, а также перезагружает ваш проект при изменениях в коде (мне этого очень не хаватало при локальной разработке).

На текущий момент фрейморк поддерживает работу с RabbitMQ, Redis Pub/Sub, Nats. Поддержка Kafka ожидается в ближайший месяц, затем - работа над генерацией схемы в соответсвии с AsyncAPI.

Пример

Давайте немного опустим все эти сухие слова и перейдем к практическому примеру.

Например, вот код "приложения" для работы с RabbitMQ с использование aio-pika

import asyncio
import aio_pika

async def main():
	connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")

	async with connection:
		channel = await connection.channel()

		queue = await channel.declare_queue("test_queue")
		async with queue.iterator() as queue_iter:
			async for message in queue_iter:
				async with message.process():
					print(message.body)

asyncio.run(main())

А вот, тот же пример с использование Propan

from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

@broker.handle("test_queue")
async def base_handler(body: str):  # тело сообщения будет приведено к `str`
	print(body)

В нем вам не нужно самостоятельно объявлять все подключения, очереди, заниматься обработкой сообщения: вы просто пишете код, а фреймворк делает все за вас. Однако, при необходимости, у вас остается возможность управлять этим поведением вручную.

При этом, пример с использованием Propan дает вам все преимущества, описанные в предыдущем разделе.

Пример с использованием Redis или Nats идентичен с точностью до аргументов функций.

from propan import PropanApp
from propan import RabbitBroker
# from propan import RedisBroker
# from propan import NatsBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
# broker = RedisBroker("redis://localhost:6379")

app = PropanApp(broker)

@broker.handle("test")
async def base_handler(body: str):
	print(body)

Использование в HTTP сервисах

Очень часто нам нужно использовать логику работы с брокерами сообщений в рамках приложений, которые обрабатывает и HTTP-запросы.

Без проблем! Просто запустите и остановите Propan вместе с вашим приложением.

Пример с aiohttp

from aiohttp import web
from propan import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

@broker.handle("test")
async def base_handler(body):
	print(body)

async def start_broker(app):
	await broker.start()

async def stop_broker(app):
	await broker.close()

app = web.Application()
app.on_startup.append(start_broker)  # запускаем Propan при старте
app.on_cleanup.append(stop_broker)   # и останавливаем вместе с приложением
web.run_app(app)

Кроме этого, если вы используете FastAPI, вы можете использовать Propan напрямую - как часть вашего приложения FastAPI.

from fastapi import FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter

app = FastAPI()
router = RabbitRouter("amqp://guest:guest@localhost:5672")

class Incoming(BaseModel):
	...
  
@router.event("test")
async def hello(body: Incoming):
	print(body)

app.include_router(router)

Заключение

Не вижу смысла пересказывать всю документацию фреймворка, вы можете найти ее здесь. Если по ходу чтения статьи у вас возникли какие-то вопросы, возможно, ответы вы также найдете там.

Сейчас фреймворк активно развивается только моими силами. Если вы заинтересованы в его дальнейшем развитии, я буду крайне благодарен за любую помощь: указание на неточности/непонятные моменты в документации, написание кода, тестирование.

В любом случае, я буду рад любому фидбеку: критике, предложениям для реализации. Если вы решите использовать фреймворк в своих проектах - буду рад услышать о вашем опыте.

Комментарии (17)


  1. kield
    25.05.2023 11:33
    +1

    Я не знаю сюда ли я пишу, но на сколько я смог понять из документации aio-pika, когда мы используем channel.set_qos(prefetch_count=<int>) то мы задаём количество операций, которое за раз может взять на себя consumer. Но я нашел у вас в коде это (propan/brokers/rabbit/rabbit_broker.py:69):

    await self._channel.set_qos(prefetch_count=int(self._max_consumers))

    max_consumers, это переменная которая отвечает за количество инстансов consumer'ов которые будут крутиться (судя по названию). В таком случае получается что каждый из них будет брать столько задач, сколько всего instance'ов. Просто я например ставлю это значение на 50-100, зная что у меня все задачи IO-bound, и в таком случае всё ок работает. Поправьте если я ошибаюсь.


    1. Propan671 Автор
      25.05.2023 11:33

      Добрый день! В целом, вы правы: prefetch_count в aio-pika говорит о том, сколько сообщений за раз может обрабатывать consumer. Этакий MessagePool.
      Просто для меня, когда я первый раз взялся за работу с RabbitMQ смысловое значение этого аргумента вызвало некоторый ступор и необходимость лезть в документацию самого Rabbit'а. Поэтому в рамках Propan я немного пересмотрел эту концепцию: в рамках приложения в качестве потребителя рассматривается 1 задача обработки одного конкретного сообщения. Поэтому max_consumers говорит о том, что у нас будет обрабатываться не более N сообщений одновременно: тот же MessagePool, вид сбоку.
      Мне кажется, что в таком ключе эта концепция легче укладывается в головах у джунов, которые первый раз видят RabbitMQ. Впрочем, я готов изменить свое мнение и переименовать данный параметр.
      Количество инстансов в Propan - это workers, которые мы запускаем из CLI.


  1. WhiteApfel
    25.05.2023 11:33
    -4

    Наконец! Пилить каждый раз новый велосипед — это мы могём.


  1. asarkhipov
    25.05.2023 11:33

    Давно хотел поработать над чем-то опенсорсным, проект выглядит перспективно. Если найду свободное время - может чем-то помогу.
    Сейчас посмотрел глазом на код кафки, вот эта конструкция может генерировать исключения
    async for msg in handler.consumer
    Я по этой причине использую getone()
    Какие планы по интеграции MQTT?


    1. Propan671 Автор
      25.05.2023 11:33

      Буду рад, если вы присоединитесь к работе над проектом: мне пригодилась бы помощь с той же кафкой. Одному рук катастрофически не хватает.
      Реализацию MQTT я вижу пока поверх paho. С этим не должно быть много проблем, но сейчас это не приоритетная задача. Пока приоритет следующий: реализация SQS, добить Kafka, NatsJS, Redis Streams, затем - документация AsyncAPI. Задач еще много...


  1. pomponchik
    25.05.2023 11:33

    А как быть, если сообщения не в формате json?


    1. Propan671 Автор
      25.05.2023 11:33

      Оно и не обязано быть json: можно послать все примитивные типы python, а сериализовать их при приеме с помощью pydantic. На крайний случай - можно принимать сырые bytes и сериализовать их самостотельно: например, в Depends, или прямо в теле функции.


      1. pomponchik
        25.05.2023 11:33
        +1

        То есть pydantic умеет работать, к примеру, с protobuf? Не слышал про такое.


        1. Propan671 Автор
          25.05.2023 11:33

          Ну, если речь о protobuf, то нет, не умеет. В перспективе - будет. Но вопрос был о json. Нет, можно посылать не только json. И нет, protobuf не умеем. Хотя, технически, вы можете легко его интегрировать в свой проект самостоятельно способом, который я описал выше.


          1. pomponchik
            25.05.2023 11:33

            А как еще могут сериализоваться примитивные типы с помощью pydantic, кроме как через json?


            1. Propan671 Автор
              25.05.2023 11:33

              Из строки, например. Сообщение же может быть просто строкой/числом/bool, etc.


              1. pomponchik
                25.05.2023 11:33

                Допустим, в сообщении число. Как оно будет сериализовано?


                1. Propan671 Автор
                  25.05.2023 11:33

                  Отправится как строка, на входе будет преобразовано в int в соответствии с аннотацией функции - потребителя


                  1. pomponchik
                    25.05.2023 11:33

                    А чем это отличается от json?


                    1. Propan671 Автор
                      25.05.2023 11:33

                      Тем, что это уже не является форматом JSON (согласно его спецификации)

                      Если честно, не вижу смысла развивать этот диалог в рамках статьи


                      1. dynamica
                        25.05.2023 11:33

                        А как по-вашему выглядит число 123 в формате JSON, согласно спецификации?


                      1. Propan671 Автор
                        25.05.2023 11:33

                        Хорошо, правда за вами: RFC 8259 утверждает, что просто значения тоже могут быть JSON, хотя и ссылается на то, что "в предыдущих стандартах JSON опеределялся только либо как object, либо как array"
                        Видимо, я почему-то руководствовался всю жизнь однажды прочитанным старым стандартом 2013 года.