Реализацией RPC запросов поверх брокеров сообщений никого не удивишь: очередь для запроса, очередь для ответа — ничего сложного.


Тот же RabbitMQ имеет пример в официальной документации. Других примеров там нет, поэтому создается впечатление, что отправка ответных сообщений в другую очередь — единственный возможный способ реализации RPC.


Этот сценарий отлично работает когда у нас есть непрерывный поток сообщений и непрерывный поток ответов на них. Однако, данный подход не применим в случаях, когда нам нужно отправить только одно сообщение и получить ответ именно на него. Мы сразу же попадаем в какой-то ад с фильтрацией ответов по correlation_id.


На самом деле, в RabbitMQ есть механизм и для такого сценария. Но он спрятан в недрах документации и о нем почти нет информации в интернете (особенно рабочих примеров кода).


Вот это недоразумение мы сейчас и исправим.


rpc


P.S: Здесь я не буду объяснять, кто такой этот ваш RabbitMQ и зачем он нужен: эту информацию вы можете найти в другой моей статье.


Direct Reply-TO


Работает все достаточно просто, за исключением некоторых нюансов, которые всплывают на практике.


Концепция заключается в следующем:


  • мы подписываемся на специальную псевдоочередь amqp.rabbitmq.reply-to
  • отправляем сообщение с указанием этой очереди в качестве reply-to заголовка
  • кролик генерирует для нас уникальный routing_key, по которому будет должно быть опубликовано ответное сообщение в default exchange
  • сервер получает наше сообщение и отправляет ответ по этому routing_key.

Нет нужды создавать какие-либо дополнительные очереди, нет дополнительных расходов на управление ими со стороны RMQ. Это абсолютно win-to-win механизм.


Алгоритм действий:


Со стороны клиента:


  • (СНАЧАЛА) подписываемся на очередь с волшебным названием amqp.rabbitmq.reply-to в no-ack режиме, объявлять ее не нужно
  • отправляем сообщение с указанием заголовка reply-to = amqp.rabbitmq.reply-to

Со стороны сервера:


  • получаем сообщение. В нем, в качестве reply-to заголовка будет нечто вида amqp.rabbitmq.reply-to.<uuid>
  • отправляем ответ в default exchange с reply-to значением в качестве ключа маршрутизации

На этом, в принципе, все. Однако каждое слово в этом алгоритме важно: сначала отправили, потом подписались — провал, попытались объявить очередь — провал, подписались в режиме ack — снова провал и т.д.


Поэтому мне пришлось потратить некоторое количество времени на написание рабочего кода. Давайте перейдем к нему, чтобы разобраться подробнее?


Python Example


Пример будет приведен с использование библиотеки aio-pika так как свою реализацию я писал имеенно на ней.


Пишем сервер


Сначала напишем некий бойлерплейт для подключения к очереди:


import asyncio
from functools import partial
import aio_pika

async def consumer(
    msg: aio_pika.IncomingMessage,
    channel: aio_pika.RobustChannelб
):
    ...

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    queue_name = "test"

    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue(queue_name)
        # через partial прокидываем в наш обработчик сам канал
        await queue.consume(partial(consumer, channel=channel))

        try:
            await asyncio.Future()
        except Exception:
            pass

asyncio.run(main())

А теперь перейдем к нашей функции-обработчику:


async def consumer(
    msg: aio_pika.IncomingMessage,
    channel: aio_pika.RobustChannel,
):
    # используем контекстный менеджер для ack'а сообщения
    async with msg.process():
        print(msg.body)

        # проверяем, требует ли сообщение ответа
        if msg.reply_to:
            # отправляем ответ в default exchange
            await channel.default_exchange.publish(
                message=aio_pika.Message(
                    body=b"hi!",
                    correlation_id=msg.correlation_id,
                ),
                routing_key=msg.reply_to,  # самое важное
            )

Как вы видите, действительно ничего сложного.


Пишем клиент


А вот тут будет немного веселья. Наша цель сделать такой же просто интерфейс как у requests:


data = requests.get("https://my-url.com").json()

Однако, это не так просто. Помните, что сначала нужно подписаться на ответную очередь? Так мы получаем следующий код:


import asyncio
import aio_pika

RABBIT_REPLY = "amq.rabbitmq.reply-to"

async def consume_response(msg: aio_pika.IncomingMessage):
    print(msg.body)

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    async with connection:
        channel = await connection.channel()

        callback_queue = await channel.get_queue(RABBIT_REPLY)

        # сначала подписываемся
        consumer_tag = await callback_queue.consume(
            callback=consume_response,
            no_ack=True,  # еще один важный нюанс
        )

        # потом публикуем
        await channel.default_exchange.publish(
            message=aio_pika.Message(
                body=b"hello",
                reply_to=RABBIT_REPLY  # указываем очередь для ответа
            ),
            routing_key="test"
        )

asyncio.run(main())

Так мы получаем ответное сообщение в нашу функцию-обработчик. Однако, теперь его нужно как-то достать оттуда. Для этого будем использовать asyncio.Queue.


import asyncio
import aio_pika

RABBIT_REPLY = "amq.rabbitmq.reply-to"

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    async with connection:
        channel = await connection.channel()

        callback_queue = await channel.get_queue(RABBIT_REPLY)

        # создаем asyncio.Queue для ответа
        rq = asyncio.Queue(maxsize=1)

        # сначала подписываемся
        consumer_tag = await callback_queue.consume(
            callback=rq.put,  # помещаем сообщение в asyncio.Queue
            no_ack=True,  # еще один важный нюанс
        )

        # потом публикуем
        await channel.default_exchange.publish(
            message=aio_pika.Message(
                body=b"hello",
                reply_to=RABBIT_REPLY  # указываем очередь для ответа
            ),
            routing_key="test"
        )

        # получаем ответ из asyncio.Queue
        response = await rq.get()
        print(response.body)

        # освобождаем RABBIT_REPLY
        await callback_queue.cancel(consumer_tag)

asyncio.run(main())

Теперь у нас уже есть что-то похожее на синхронный запрос-ответ. Можно немного поколдовать над интерфейсами и вы получите RPC over RMQ запрос, идентичный натуральному requests.


Вместо заключения


Ну а я уже поколдовал над этими интерфейсами. И вы можете увидеть результат этого колдовства в моем фреймворке Propan.


С его использование RPC запросы будут выглядеть для вас следующим образом:


from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(rabbit_broker)

# server side
@broker.handle("ping")
async def heartbeat():
    return "pong"

@app.after_startup
async def self_ping():
    # client RPC request
    response = await broker.publish(queue="ping", callback=True)
    assert response == "pong"

И теперь вы точно знаете, что у них под капотом.


Нюансы


RabbitMQ Direct Reply-to действительно отличный механзим, однако и у него есть ограничения.


На псевдочередь amqp.rabbitmq.reply-to можно подписываться из разных сервисов неограниченное число раз одновременно, однако, если вы хотите отправить несколько разных запросов в рамках одного сервиса (одного connection, если быть точным) одновременно, у вас не получится это сделать: вы словите ошибку, что очередь уже имеет потребителя.


Поэтому в рамках одного сервиса необходимо использовать локи на отправку RPC запросов, что, к слову, также реализовано в Propan.

Комментарии (3)


  1. baldr
    12.07.2023 18:34
    +1

    Кстати, спасибо. Много провел времени с RabbitMQ, но почему-то не встречал этой фичи.

    Очень важное дополнение, которое есть в офсправке, но почему-то нет у вас: отправка сообщения должна быть через то же соединение и channel что и слушаем для ответа. Это может быть не всегда так в таких фреймворках как, например, kombu.

    В пределах одного connection можно (и рекомендуется) открывать новые channel, так что локи не обязательны.


    1. Propan671 Автор
      12.07.2023 18:34

      Спасибо за информацию. Я как-то и не задумывался даже, что кто-то захочет делать отправку через другой connection и channel. Насчет поднятия дополнительных channel не уверен насколько это оправдано для этого кейса. Если вы хотите прям бродкастить RPC, наверное, нам стоит все-таки создать для ответов полноценную очередь. Если вы хотите отправить 2-3 сообщения подряд, то лок не сыграет большой роли.
      Но я еще изучу этот вопрос и буду благодарен, если поможете с материалом на эту тему.


      1. orlovdl
        12.07.2023 18:34

        Если в кластере пара десятков нод, то в aio-pika есть пулы, чтобы цепляться к разным хостам (там тоже не так просто, но тем-не менее). В RabbitMQ, политика по умолчанию, распределяет очереди по тем нодам на которых они были объявлены, таким образом если в рамках одного соединения, задекларировать 1000 очередей, то нагрузка на кластер будет неравномерная.