Реализацией RPC запросов поверх брокеров сообщений никого не удивишь: очередь для запроса, очередь для ответа — ничего сложного.
Тот же RabbitMQ имеет пример в официальной документации. Других примеров там нет, поэтому создается впечатление, что отправка ответных сообщений в другую очередь — единственный возможный способ реализации RPC.
Этот сценарий отлично работает когда у нас есть непрерывный поток сообщений и непрерывный поток ответов на них. Однако, данный подход не применим в случаях, когда нам нужно отправить только одно сообщение и получить ответ именно на него. Мы сразу же попадаем в какой-то ад с фильтрацией ответов по correlation_id
.
На самом деле, в RabbitMQ есть механизм и для такого сценария. Но он спрятан в недрах документации и о нем почти нет информации в интернете (особенно рабочих примеров кода).
Вот это недоразумение мы сейчас и исправим.
P.S: Здесь я не буду объяснять, кто такой этот ваш RabbitMQ и зачем он нужен: эту информацию вы можете найти в другой моей статье.
Direct Reply-TO
Работает все достаточно просто, за исключением некоторых нюансов, которые всплывают на практике.
Концепция заключается в следующем:
- мы подписываемся на специальную псевдоочередь
amqp.rabbitmq.reply-to
- отправляем сообщение с указанием этой очереди в качестве
reply-to
заголовка - кролик генерирует для нас уникальный
routing_key
, по которому будет должно быть опубликовано ответное сообщение вdefault exchange
- сервер получает наше сообщение и отправляет ответ по этому
routing_key
.
Нет нужды создавать какие-либо дополнительные очереди, нет дополнительных расходов на управление ими со стороны RMQ. Это абсолютно win-to-win механизм.
Алгоритм действий:
Со стороны клиента:
- (СНАЧАЛА) подписываемся на очередь с волшебным названием
amqp.rabbitmq.reply-to
в no-ack режиме, объявлять ее не нужно - отправляем сообщение с указанием заголовка
reply-to = amqp.rabbitmq.reply-to
Со стороны сервера:
- получаем сообщение. В нем, в качестве
reply-to
заголовка будет нечто видаamqp.rabbitmq.reply-to.<uuid>
- отправляем ответ в
default exchange
сreply-to
значением в качестве ключа маршрутизации
На этом, в принципе, все. Однако каждое слово в этом алгоритме важно: сначала отправили, потом подписались — провал, попытались объявить очередь — провал, подписались в режиме ack — снова провал и т.д.
Поэтому мне пришлось потратить некоторое количество времени на написание рабочего кода. Давайте перейдем к нему, чтобы разобраться подробнее?
Python Example
Пример будет приведен с использование библиотеки aio-pika так как свою реализацию я писал имеенно на ней.
Пишем сервер
Сначала напишем некий бойлерплейт для подключения к очереди:
import asyncio
from functools import partial
import aio_pika
async def consumer(
msg: aio_pika.IncomingMessage,
channel: aio_pika.RobustChannelб
):
...
async def main():
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
queue_name = "test"
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue(queue_name)
# через partial прокидываем в наш обработчик сам канал
await queue.consume(partial(consumer, channel=channel))
try:
await asyncio.Future()
except Exception:
pass
asyncio.run(main())
А теперь перейдем к нашей функции-обработчику:
async def consumer(
msg: aio_pika.IncomingMessage,
channel: aio_pika.RobustChannel,
):
# используем контекстный менеджер для ack'а сообщения
async with msg.process():
print(msg.body)
# проверяем, требует ли сообщение ответа
if msg.reply_to:
# отправляем ответ в default exchange
await channel.default_exchange.publish(
message=aio_pika.Message(
body=b"hi!",
correlation_id=msg.correlation_id,
),
routing_key=msg.reply_to, # самое важное
)
Как вы видите, действительно ничего сложного.
Пишем клиент
А вот тут будет немного веселья. Наша цель сделать такой же просто интерфейс как у requests:
data = requests.get("https://my-url.com").json()
Однако, это не так просто. Помните, что сначала нужно подписаться на ответную очередь? Так мы получаем следующий код:
import asyncio
import aio_pika
RABBIT_REPLY = "amq.rabbitmq.reply-to"
async def consume_response(msg: aio_pika.IncomingMessage):
print(msg.body)
async def main():
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
async with connection:
channel = await connection.channel()
callback_queue = await channel.get_queue(RABBIT_REPLY)
# сначала подписываемся
consumer_tag = await callback_queue.consume(
callback=consume_response,
no_ack=True, # еще один важный нюанс
)
# потом публикуем
await channel.default_exchange.publish(
message=aio_pika.Message(
body=b"hello",
reply_to=RABBIT_REPLY # указываем очередь для ответа
),
routing_key="test"
)
asyncio.run(main())
Так мы получаем ответное сообщение в нашу функцию-обработчик. Однако, теперь его нужно как-то достать оттуда. Для этого будем использовать asyncio.Queue
.
import asyncio
import aio_pika
RABBIT_REPLY = "amq.rabbitmq.reply-to"
async def main():
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
async with connection:
channel = await connection.channel()
callback_queue = await channel.get_queue(RABBIT_REPLY)
# создаем asyncio.Queue для ответа
rq = asyncio.Queue(maxsize=1)
# сначала подписываемся
consumer_tag = await callback_queue.consume(
callback=rq.put, # помещаем сообщение в asyncio.Queue
no_ack=True, # еще один важный нюанс
)
# потом публикуем
await channel.default_exchange.publish(
message=aio_pika.Message(
body=b"hello",
reply_to=RABBIT_REPLY # указываем очередь для ответа
),
routing_key="test"
)
# получаем ответ из asyncio.Queue
response = await rq.get()
print(response.body)
# освобождаем RABBIT_REPLY
await callback_queue.cancel(consumer_tag)
asyncio.run(main())
Теперь у нас уже есть что-то похожее на синхронный запрос-ответ. Можно немного поколдовать над интерфейсами и вы получите RPC over RMQ запрос, идентичный натуральному requests.
Вместо заключения
Ну а я уже поколдовал над этими интерфейсами. И вы можете увидеть результат этого колдовства в моем фреймворке Propan.
С его использование RPC запросы будут выглядеть для вас следующим образом:
from propan import PropanApp, RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(rabbit_broker)
# server side
@broker.handle("ping")
async def heartbeat():
return "pong"
@app.after_startup
async def self_ping():
# client RPC request
response = await broker.publish(queue="ping", callback=True)
assert response == "pong"
И теперь вы точно знаете, что у них под капотом.
Нюансы
RabbitMQ Direct Reply-to действительно отличный механзим, однако и у него есть ограничения.
На псевдочередь amqp.rabbitmq.reply-to
можно подписываться из разных сервисов неограниченное число раз одновременно, однако, если вы хотите отправить несколько разных запросов в рамках одного сервиса (одного connection
, если быть точным) одновременно, у вас не получится это сделать: вы словите ошибку, что очередь уже имеет потребителя.
Поэтому в рамках одного сервиса необходимо использовать локи на отправку RPC запросов, что, к слову, также реализовано в Propan.
baldr
Кстати, спасибо. Много провел времени с RabbitMQ, но почему-то не встречал этой фичи.
Очень важное дополнение, которое есть в офсправке, но почему-то нет у вас: отправка сообщения должна быть через то же соединение и channel что и слушаем для ответа. Это может быть не всегда так в таких фреймворках как, например, kombu.
В пределах одного connection можно (и рекомендуется) открывать новые channel, так что локи не обязательны.
Propan671 Автор
Спасибо за информацию. Я как-то и не задумывался даже, что кто-то захочет делать отправку через другой connection и channel. Насчет поднятия дополнительных channel не уверен насколько это оправдано для этого кейса. Если вы хотите прям бродкастить RPC, наверное, нам стоит все-таки создать для ответов полноценную очередь. Если вы хотите отправить 2-3 сообщения подряд, то лок не сыграет большой роли.
Но я еще изучу этот вопрос и буду благодарен, если поможете с материалом на эту тему.
orlovdl
Если в кластере пара десятков нод, то в aio-pika есть пулы, чтобы цепляться к разным хостам (там тоже не так просто, но тем-не менее). В RabbitMQ, политика по умолчанию, распределяет очереди по тем нодам на которых они были объявлены, таким образом если в рамках одного соединения, задекларировать 1000 очередей, то нагрузка на кластер будет неравномерная.