WARNING: длинная вступительная часть. Если хотите перейти сразу к делу - листайте до Getting Started.

Вступление

В 2023 году писать сервисы, взаимодействующие друг с другом через RabbitMQ, всё ещё неоправданно сложно. Ещё больше сложностей возникает с тестированием бизнес-логики в них, с согласованием контрактов между ними, с организацией монорепозиториев.

В недрах компании Alem Research, ещё года с 19-го, я начал писать легковесный фреймворк, который по задумке должен был стать чем-то вроде Flask или FastAPI для работы с RabbitMQ.

Удалось ли? Пожалуй, да. При помощи этого фреймворка наши дата-саентисты смогли самостоятельно писать и упаковывать в докер сервисы, которые лёгким движением руки и парой перебиндов встраиваются в трубу.

Впрочем, первая версия работала ещё с kombu, а позже ядро было переписано на aio-pika, и фреймворк стал ещё и асинхронным. Что даёт асинхронность? Например, можно написать фетчер HTTP-страниц, который при увеличении prefetch_count будет фетчить параллельно огромное количество страниц, и даже сможет утилизировать сеть практически полностью, чего без асинхронности было бы довольно сложно добиться.

Кроме того, асинхронность позволяет легче лёгкого реализовать паттерн RPC через RabbitMQ в рамках одного процесса, что довольно удобно для тестирования. Да и интеграция с FastAPI становится очень удобной. А скорость обработки растёт в десятки раз.

В 2020-м мы совместно с фаундером и тогдашним генеральным директором решили, что фреймворк смело можно выкладывать в open-source. И в целом, казалось бы, всё хорошо.

Однако, есть и проблемы. Главная из них - это нехватка живых пользователей и контрибьюторов. Я ушёл из Alem Research, и в компании продолжать работать над фреймворком в данный момент некому. Иногда кто-то из компании пишет мне запросы по фреймворку, но мне очень лень выкатывать обновления для одной-единственной компании: это получается своего рода работа на работодателя, который тебе больше не платит :)

Я давно уже собирался написать питчинг-статью про Mela, чтобы привлечь новых пользователей, а возможно даже и контрибьюторов, но не решался, потому что код был очень далёк от идеала. Да-да, меня просто жрал внутренний перфекционизм и стыд за то, чтобы выкатить что-то не очень красивое. Но сегодня я понял, что стыдиться мне особо нечего: да, архитектура ядра не идеальна, но за последние пару месяцев, пока я сидел без работы, я уже продумал её со всех сторон, и знаю, что делать дальше. В общем, красивая версия не готова, но у меня есть план.

У меня есть план
У меня есть план

И я решил, что будет неправильно (и долго) реализовывать свой план в одно лицо, лучше сразу попробовать привлечь к нему потенциальных контрибьюторов. В общем, если вас заинтересует то, что описано ниже - добро пожаловать на борт!

Извините за долгое вступление. Мы наконец-то отправляемся в путь.

Getting started

Конечно же, всё начинается с:

pip install mela=1.1.1

Версию я указал чтобы статья оставалась релевантной даже после обновлений.

Давайте напишем сервис, который будет получать сообщение из очереди, выводить его body распарсенный в json, и возвращать сообщение обратно.

# app.py

from mela import Mela

app = Mela(__name__)


@app.service("printer")
def printer(body, message):
    print(body)
    return body


if __name__ == '__main__':
    app.run()

Как вы можете догадаться, это ещё не всё, что требуется для запуска приложения: здесь нет никакой информации об очередях и подключении. Это потому что она хранится в файле application.yml.

# application.yml
connections:
  default:
    host: localhost
    port: 5672
    username: user
    password: bitnami

services:
  printer:
    consumer:
      exchange: general-sentiment-x
      routing_key: general-sentiment-q
      queue: general-sentiment-q
    publisher:
      exchange: general-sentiment-x
      routing_key: general-sentiment-q

Вот и всё.

Этот пример знакомит нас сразу с тремя самыми основными высокоуровневыми концепциями Мелы: Publisher, Consumer и Service.

Если с паблишером и консьюмером всё понятно, то про Service, пожалуй, поясню, что это элемент трубы, который состоит из комбинации консьюмера и паблишера: он консьюмит сообщения из определённой очереди, обрабатывает его, и паблишит в указанный эксчейндж. По сути, сервис состоит из паблишера и консьюмера, что в том числе отражено в Yaml-файле.

Функция, которая находится под декоратором @app.service(...), как не сложно догадаться, является коллбэком консьюмера, а то, что она возвращает - будет отправлено в паблишер, привязанный к сервису.

Pydantic

Какой современный фреймворк без Pydantic, правда? И их есть у меня.

# app.py
from pydantic import BaseModel
from datetime import datetime

from mela import Mela


app = Mela(__name__)


class Document(BaseModel):
    text: str
    url: str
    date: datetime


@app.service('validator')
def validator(body: Document) -> Document:
    if '#' in body.url:
        body.url = body.url.split('#')[0]
    return body


if __name__ == '__main__':
    app.run()

В данном случае мы используем new-style сигнатуру обработчика, и явно указывать второй аргумент, в который в предыдущем примере прилетел бы объект Message, нам не нужно.

body сначала переводится в json, потом этот json скармливается в класс Document.

При ошибках валидации ошибки будут выведены в консоль, а сообщение вернётся обратно в очередь. Если, конечно, в консьюмереrequeue_broken_messages=True (по умолчанию так) и если у консьюмера не задан dead letter exchange. То же самое будет происходить если в коллбэке зарейзится любая другая ошибка.

Управление ack/nack

Есть несколько способов управлять ответами. Давайте рассмотрим все сразу.

from datetime import datetime
from datetime import timedelta
from pydantic import BaseModel

from mela import IncomingMessage
from mela import Mela
from mela.components.exceptions import NackMessageError

app = Mela(__name__)


class Document(BaseModel):
    text: str
    url: str
    date: datetime


@app.service("filter")
async def filter_(body: Document, message: IncomingMessage):
    if body.date > datetime.utcnow():
        # First way: we can raise special exception with some `requeue` value
        raise NackMessageError("We are not working with time travellers", requeue=False)
    elif body.date < datetime.utcnow() - timedelta(days=365):
        # Second way: we can manually nack message via IncomingMessage object
        # As you can see, in this case we can't write any message about requeue reason.
        # But it is still useful if you need to silently send message to DLX
        await message.nack(requeue=False)  # Go to archive, dude

    if body.url == '':
        # Third way: we can raise almost any exception. The message should be or should not 
        # be requeued based on `requeue_broken_messages` value
        raise AssertionError("Message without url is not acceptable")

    return body


if __name__ == '__main__':
    app.run()

Чистый консьюмер

Консьюмер создаётся точно так же, как сервис, но с другим декоратором.

from pydantic import BaseModel
from pydantic import EmailStr
from mela import Mela

app = Mela(__name__)


class EmailNotification(BaseModel):
    template_name: str
    vars: dict
    receiver: EmailStr


@app.consumer("email-sender")
def printer(body: EmailNotification):
    # Some Jinja2 and SMTP integration
    pass


if __name__ == '__main__':
    app.run()

Если в ходе обработки сообщения не произошло никакой ошибки, то сообщение будет акнуто автоматически. Это поведение можно изменить. Однако, чаще всего оно всех устраивает. В любом случае, можно вручную акнуть сообщение, думаю, вы уже догадались, как это сделать.

Ещё важно показать application.yml для консьюмера:

connections:
  default:
    host: localhost
    port: 5672
    username: user
    password: bitnami

consumers:
  email-sender:
    exchange: notifications-x
    exchange_type: topic
    routing_key: "email.#"
    queue: email-sender-q

Собственно, да. Просто другое название блока. И невзначай показанный пример работы с эксчейнджами других типов :)

Чистый паблишер и интеграция с FastAPI

from datetime import datetime
from uuid import uuid4

from fastapi import FastAPI
from mela import Mela
from mela.settings import Settings
from pydantic import BaseModel

app = FastAPI()

mela_app = Mela(__name__)
mela_app.settings = Settings()


class ReportRequest(BaseModel):
    start_date: datetime
    end_date: datetime
    user_id: str
    report_id: str | None = None


@app.post("/report")
async def read_root(report_request: ReportRequest):
    if report_request.report_id is None:
        report_request.report_id = str(uuid4())
    # some DB writing
    publisher = await mela_app.publisher_instance('report-generator')
    await publisher.publish(report_request)
    return report_request

application.yml:

connections:
  default:
    host: localhost
    port: 5672
    username: user
    password: bitnami

publishers:
  report-generator:
    exchange: report-x
    routing_key: new-report

Тут даже не знаю что комментировать. Разве что добавлю, что в будущем хотелось бы сделать внешний декоратор, чтобы инжектить паблишеры и RPC клиенты в сторонние функции так же, как в следующем примере. Но это уже к планам на будущее. А пока перейдём к следующему примеру.

Инъекции дополнительных паблишеров

Иногда нам нужно реализовать сплиттер или просто по ходу обработки сообщения что-то куда-то запаблишить. Можно, конечно, сделать как в предыдущем разделе, но можно сделать круче:

from datetime import datetime

from pydantic import BaseModel

from mela import Mela
from mela.components import Publisher

app = Mela(__name__)


class Document(BaseModel):
    text: str
    url: str
    date: datetime
    has_images: bool = False


@app.service('archiver')
async def archiver(document: Document, images_downloader: Publisher = 'images-downloader') -> Document:
    # archiving document
    
    if document.has_images:
        await images_downloader.publish(document)
    
    return document


if __name__ == '__main__':
    app.run()

application.yml:

connections:
  default:
    host: localhost
    port: 5672
    username: admin
    password: admin

services:
  archiver:
    consumer:
      exchange: archiver-x
      routing_key: archiver-q
      queue: archiver-q
    publisher:
      exchange: notify-archived-x
      exchange_type: topic
      routing_key: document.archived

publishers:
  images-downloader:
    exchange: images-downloader-x
    routing_key: images-downloader-q

На данный момент инъекции реализованы неправильно с точки зрения типизации. Есть план переписать их на новый служебный тип Annotated. Но... Главное работает.

DLX

Для любого консьюмера поддерживается Dead Letter Exchange.

Сделать его очень просто:

connections:
  default:
    host: localhost
    port: 5672
    username: admin
    password: admin

services:
  service_with_dlx:
    consumer:
      exchange: dlx-test-x
      routing_key: dlx-test-k
      queue: dlx-test-q
      dead_letter_exchange: dlx-test-dead-letter-x
      dead_letter_routing_key: dlx-test-dead-letter-k
    publisher:
      exchange: test-x
      routing_key: test_queue

Но вам придётся самостоятельно следить за тем, чтобы была очередь, которая будет собирать сломанные сообщения из вашего эксчейнджа.

RPC

# server.py
import asyncio
import aio_pika

from mela import Mela

app = Mela(__name__)


async def fetch(url):
    # asynchronously fetching url here and return its body
    await asyncio.sleep(1)
    return url


@app.rpc_service("fetcher")
async def fetcher(url: str):
    return {"fetched": await fetch(url)}


bots = {}


def create_bot(bot_id, bot_username, bot_password):
    bots[bot_id] = {'username': bot_username, 'password': bot_password}


def get_bot(bot_id):
    return bots[bot_id]


@app.rpc_service("bot_manager")
async def fetcher(body, message: aio_pika.Message):
    if message.headers['method'] == 'create_bot':
        create_bot(**body)
        return {'result': None, 'status': "OK"}
    elif message.headers['method'] == 'get_bot':
        return {'result': get_bot(**body), 'status': "OK"}
    else:
        return {'result': None, 'status': "ERROR_UNKNOWN_METHOD"}

if __name__ == '__main__':
    app.run()
# client.py
import asyncio

from mela import Mela

app = Mela(__name__)


async def main():
    # RPC calls over RabbitMQ never were simpler!

    fetcher = await app.rpc_client_instance("fetcher")

    bot_manager = await app.rpc_client_instance("bot_manager")

    res = await fetcher.call({'url': "test"})
    print(res)

    # we can even gather call results!
    g = await asyncio.gather(fetcher.call({'url': url1}), fetcher.call({'url': url2}))
    print(g)

    create_bot_result = await bot_manager.call({
        'bot_id': 1,
        'bot_username': "LalkaPalka",
        'bot_password': "supersecret",
    },
        headers={'method': 'create_bot'},
    )
    print(f"create_bot result {create_bot_result}")

    get_bot_result = await bot_manager.call({'bot_id': 1}, headers={'method': 'get_bot'})
    print(f"get_bot_result {get_bot_result}")

    unknown_method_result = await bot_manager.call({'bot_id': 4}, headers={'method': 'getBot'})
    print(f"unknown method result: {unknown_method_result}")


if __name__ == '__main__':
    url1 = (
        'https://tengrinews.kz/kazakhstan_news/vorvalis-dom-izbili-'
        'almatinka-rasskazala-zaderjanii-supruga-459127/'
    )
    url2 = (
        'https://www.inform.kz/ru/skol-ko-lichnyh-podsobnyh-'
        'hozyaystv-naschityvaetsya-v-kazahstane_a3896073'
    )
    app.run(main())
connections:
  default:
    host: localhost
    port: 5672
    username: user
    password: bitnami

rpc-services:
  fetcher:
    exchange: fetcher-x
    routing_key: fetcher-k
    queue: fetcher-q
    response_exchange: fetching-result-x
  bot_manager:
    exchange: botmanager-x
    routing_key: botmanager-k
    queue: botmanager-q
    response_exchange: botmanager-result-x

Вот вам пример сразу с двумя не конфликтующими друг с другом RPC-сервисами в одном процессе.

Тут тоже не вижу смысла что-то объяснять, но если будут вопросы - с радостью отвечу.

Несколько подключений и переменные окружения

Очень распространённый кейс - когда нам нужно перекачать данные из одного кластера реббита в другой. Сейчас мы не будем обсуждать правильность этой практики, а просто покажем, как это можно сделать легко и непринуждённо.

# application.yml
connections:
  input_connection:
    host: $RABBIT_INPUT_HOST
    port: ${RABBIT_INPUT_PORT|5672}
    username: ${RABBIT_INPUT_USERNAME|rabbitmq-bridge}
    password: ${RABBIT_INPUT_PASSWORD|rabbitmq-bridge}
  output_connection:
    host: $RABBIT_OUTPUT_HOST
    port: ${RABBIT_OUTPUT_PORT|5672}
    username: ${RABBIT_OUTPUT_USERNAME|rabbitmq-bridge}
    password: ${RABBIT_OUTPUT_PASSWORD|rabbitmq-bridge}

services:
  bridge:
    consumer:
      connection: input_connection
      prefetch_count: ${RABBIT_INPUT_PREFETCH_COUNT|1}
      routing_key: ${RABBIT_INPUT_ROUTING_KEY}
      exchange: ${RABBIT_INPUT_EXCHANGE}
      queue: ${RABBIT_INPUT_QUEUE}
    publisher:
      connection: output_connection
      routing_key: ${RABBIT_OUTPUT_ROUTING_KEY}
      exchange: ${RABBIT_OUTPUT_EXCHANGE}
# app.py
from mela import Mela

app = Mela(__name__)


@app.service("bridge")
async def serve(body, message):
    return body


if __name__ == '__main__':
    app.run()

Как вы можете заметить, код сервиса очень простой. А вот в конфигурационном файле есть кое-что новое. Первое, что бросается в глаза - это переменные окружения. Да, их очень просто сюда вшить. Вот, кстати, пример дотэнва:

RABBIT_INPUT_HOST=localhost
RABBIT_INPUT_ROUTING_KEY=routing-key
RABBIT_INPUT_EXCHANGE=exchange
RABBIT_INPUT_QUEUE=queue
RABBIT_OUTPUT_HOST=localhost
RABBIT_OUTPUT_PORT=5673
RABBIT_OUTPUT_ROUTING_KEY=routing-key
RABBIT_OUTPUT_EXCHANGE=exchange

Всё очень просто и прямолинейно, правда?

А ещё это по сути уже можно упаковывать в докер, и запускать через оркестратор.

Второе, на что нужно обратить внимание в конфиг-файле этого раздела - это то, что здесь два коннекшена, и для консьюмера и паблишера используются разные коннекшены.

На этом у меня юзкейсы закончились. Перейдём к следующему разделу.

Производительность

Цель этой статьи - не похвастаться, да и к тому же производительность фреймворка - это целиком и полностью заслуга авторов aio-pika, а не моя. Детальные бенчмарки - это будет тема для отдельной статьи, но пока просто скажу, что на моём не самом мощном ноуте простой бридж между двумя реббитами обрабатывает порядка 500 сообщений в секунду. А лучшее, чего мне удавалось добиться от обычной pika - это 80-100 сообщений в секунду. В случае с фетчингом страниц по всем понятным причинам не асинхронная pika не могла показать вообще сколько-нибудь адекватный результат.

Заключение

Как мне кажется, мне удалось составить неплохую внешнюю апишку. За исключением инъекций, но про них я писал выше. Внутри - бардак. Всё работает, конечно, но мейнтейнить и дальше развивать фреймворк будет сложно. Я уже начал переписывать ядро в чистовой вариант, но мне не хватает мотивации.

Поэтому я и написал эту статью. Если статья получит отклик, если этот фреймворк кого-то заинтересует, если найдутся люди, которые будут им пользоваться, а может даже и контрибьютить, то я обязательно продолжу работу над ним. Можете даже просто звёздочек накидать, это тоже даст мне хоть какую-то обратную связь :)

В конце дублирую ссылку на GitHub

Пишите вопросы в комментах. Ответами на самые важные буду дополнять статью, на остальные отвечу просто in-place.

Комментарии (3)


  1. nailele
    24.05.2023 06:25
    -2

    ура, опенсорс


  1. SeekerOfTruth
    24.05.2023 06:25
    +1

    по сравнению с aioamqp библиотекой какие плюсы и минусы можете выделить?


    1. uthunderbird Автор
      24.05.2023 06:25

      Если коротко — более высокоуровневые апишка, со всеми вытекающими плюсами и минусами.

      Этот фреймворк использует под капотом aio-pika, который в свою очередь использует aiormq. Насколько я понял, aioamqp по уровню абстракции где-то между этими двумя (ближе ко второму). Так что сравнивать его будет наиболее корректно с одним из них.