Всем привет! Буквально недавно мне пришлось изучить всю официальную документацию RabbitMQ и множество статей разного качества на тему разных видов маршрутизации в этом брокере.

Оказалось, что материала на эту тему много, но он либо объясняет совсем базовые случаи, либо уходит в такие дебри, которые крайне далеки и тяжелы для человека, который хочет "просто разобраться".

Простых и понятных примеров на Python тоже мало, так как они заставляют вас вникать в код библиотеки, а не особенности RabbitMQ. Для человека, который видит что-то pika-подобное первый раз, это может быть испытанием.

Поэтому я и решил написать эту статью: она нацелена на "чайников" - вы легко можете показать ее вашим джунам, и этого уже должно быть достаточно для начала работы с RabbitMQ.

Примеры в статье будут приведены с использованием фреймворка Propan, чтобы не утруждать джуновский мозг лишними деталями установления соединения, канала и тд.

Зачем вам RabbitMQ?

Данный раздел содержит много "воды" о том, что такое Messaging, кто такие эти ваши брокеры сообщений и какое место среди них занимает RabbitMQ. Если вы знакомы со всеми этими понятиями - можете сразу переходить к следующему разделу, иначе - загляните под спойлер.

Вода о том, зачем нужен RabbitMQ

Так как эта статья для джунов, неплохо было бы разобраться, что такое RabbitMQ и зачем он нам нужен.

Если просто, RabbitMQ - это брокер сообщений. Один из многих. Другими популярными вариантами являются Kafka (не совсем брокер, но смысл тот же), Nats, SQS и даже Redis (он тоже так умеет, причем разными способами). Есть и более экзотические варианты типа Pulsar, ActiveMQ, Tarantool и т.д.

Что же такое брокер сообщений? - Это внешний сервис, который принимает, хранит и распределяет сообщения между потребителями. Хотя архитектура Messaging и может быть реализована без внешнего брокера путем отправки сообщений напрямую (MQTT, ZeroMQ), внедрение в систему отдельной сущности, которая управляет жизненным циклом и распределением сообщений, повышает отказоустойчивость системы и обеспечивает простоту ее дальнейшего масштабирования.

Данная архитектура направлена на построение система с асинхронными неблокирующими операциями. Если HTTP запросы подразумевают получение ответа, то отправка сообщения подразумевает только подтверждение того, что сообщение было помещено в брокер, а также, в некоторых случаях, подтверждение доставки сообщения до потребителя и даже подтверждение успешной его обработки.

Также, некоторые брокеры позволяют вам без лишних затрат использовать RPC запросы, когда отправка сообщения подразумевает получение ответа в другом сообщении. Этот ответ может быть отправлен как через временную одноразовую очередь, так и в некую постоянную очередь, что позволяет нам получать от одного до нескольких ответов на отправленное сообщение, а также разнести запрос и ответ по времени на неопределенный промежуток не блокируя приложение на время ожидания ответа (в то время как HTTP запрос заблокировал упал бы по таймауту).

Таким образом, использование паттерна Messaging позволяет построить значительно более гибкую систему сервисов, способную выполнять как синхронные, так и асинхронные задачи с различными гарантиями.

Однако, основной сценарий Messaging - выполнение асинхронных операций. На самом деле, большая часть действий, которые мы хотим совершать в нашем приложении по своей физической природе асинхронны: отправка уведомлений, почтовая рассылка, плановое выполнение задач и тд. В сценариях, когда вам нужно просто запустить дальнейшую обработку данных и не дожидаться результатов этой обработки, Messaging подходит идеально. С помощью этого паттерна вы можете легко построить распределенные системы потоковой обработки данных (когда результаты каждого слоя передаются на следующий через сообщения), системы планового выполнения задач, оповещений, массовых рассылок, реагирования на инциденты, сервисы IOT и многое другое.

Хотя RabbitMQ и не является самым производительным из брокеров, его отличительной особенностью является возможность построения системы маршрутизации сообщений любой сложности.

Основные понятия

В RabbitMQ существует три основных вида объектов маршрутизации:

  • Exchange (Обменник) - туда сообщения отправляются;

  • Queue (очередь) - оттуда мы забираем сообщения;

  • Binding (связь) - определяет правила доставки сообщений от Exchange к Queue. Собственно, это все, что нужно знать для начала работы с RabbitMQ: объявляем Exchange, куда будут приходить сообщения, привязываем к нему очередь, слушаем эту очередь - mission complete!

При этом правила маршрутизации сообщения определяются типом Exchange и параметрами привязки очереди. Сама же очередь инкапсулирует в себе логику доставки сообщений до потребителей, следит за жизненным циклом сообщений и тд, и тп, но в это мы не будет вникать.

Лучше разберемся со всеми типами маршрутизации по порядку.

Direct Exchange

Direct Exchange - базовый способ маршрутизации сообщений в RabbitMQ. Его суть очень проста: exchange отправляет сообщения в те очереди, routing_key который совпадает с routing_key отправляемого сообщения.

При привязке очереди к Exchange указывается routing_key (ключ маршрутизации). В большинстве случаев он совпадает с названием самой очереди.
Также routing_key указывается при отправке сообщения, этот ключ и определяет, куда дальше направится это сообщение.

На примере все становится очень ясно:

from propan import PropanApp, RabbitBroker
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType

broker = RabbitBroker()
app = PropanApp(broker)

# Объявляем Direct Exchange
exchange = RabbitExchange("test-exchange", type=ExchangeType.DIRECT)

# Объявляем парочку разных очередей
queue_1 = RabbitQueue("test-q-1")
queue_2 = RabbitQueue("test-q-2")

# Слушаем queue_1
@broker.handle(queue_1, exchange)
async def handler1():
    print("handler1")

# Слушаем queue_2
@broker.handle(queue_2, exchange)
async def handler2():
    print("handler2")

@app.after_startup
async def send_messages():
    # Отправляем сообщение в наш exchange в queue_1
    # оно будет обработано handler1
	await broker.publish(exchange=exchange, routing_key="test-q-1")

	# Отправляем сообщение в наш exchange в queue_1
	# оно будет обработано handler2
	await broker.publish(exchange=exchange, routing_key="test-q-2")

Единственно, что я могу тут добавить: RabbitMQ поддерживает возможность привязывать одну очередь к нескольким разным Exchange и даже к одному Exchange с разными ключами маршрутизации. Таким образом, сообщения, отправленные в разные Exchange и с разными параметрами, могут стекаться в одну очередь и доставляться одним потребителям.

Исходя из моей практики, таких сценариев стоит избегать: лучше объявить новую очередь, чем пытаться затащить разные потоки сообщений в старую.

Fanout Exchange

Fanout Exchange - еще более простой способ маршрутизации в RabbitMQ. Данный тип exchange отправляет сообщения во все очереди, подписанные на него, игнорируя любые аргументы самого сообщения.

Я думаю, следующий пример также не требует дополнительных комментариев.

from propan import PropanApp, RabbitBroker
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType

broker = RabbitBroker()
app = PropanApp(broker)

# Объявляем Fanout Exchange
exchange = RabbitExchange("test-exchange", type=ExchangeType.FANOUT)

# Объявляем парочку разных очередей
queue_1 = RabbitQueue("test-q-1")
queue_2 = RabbitQueue("test-q-2")

# Слушаем queue_1
@broker.handle(queue_1, exchange)
async def handler1():
	print("handler1")

# Слушаем queue_2
@broker.handle(queue_2, exchange)
async def handler2():
	print("handler2")

@app.after_startup
async def send_messages():
	# Отправляем сообщение в наш exchange
	# оно будет обработано обоими потребителями
	await broker.publish(exchange=exchange)

Topic Exchange

Topic Exchange - мощный механизм маршрутизации RabbitMQ. Данный тип exchange отправляет сообщения в очереди в соответствии с паттерном, указанном при их подключении к exchange и routing_key самого сообщения.

Данный тип exchange может использоваться и как Direct, и как Fanout, и как комбинированные варианты их обоих, однако работает несколько медленнее (но быстрее Headers, который мы рассмотрим следующим). В целом, это отличный компромисс между гибкостью и производительностью, поэтому он часто используется на практике.

Немного кода:

from propan import PropanApp, RabbitBroker
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType

broker = RabbitBroker()
app = PropanApp(broker)

# Объявляем Topic Exchange
exchange = RabbitExchange("test-exchange", type=ExchangeType.TOPIC)

# Объявляем парочку разных очередей и их ключи маршрутизации
queue_1 = RabbitQueue("test-queue-1", routing_key="*.info")
queue_2 = RabbitQueue("test-queue-2", routing_key="*.debug")
queue_3 = RabbitQueue("test-queue-2", routing_key="logs.*")

# Слушаем queue_1
@broker.handle(queue_1, exchange)
async def handler1():
	print("handler1")

# Слушаем queue_2
@broker.handle(queue_2, exchange)
async def handler2():
	print("handler2")

# Слушаем queue_3
@broker.handle(queue_2, exchange)
async def handler2():
	print("handler3")

@app.after_startup
async def send_messages():
	# Отправляем сообщение в наш exchange
	# Ключ сообщения подпадает под *.info и logs.*
	# Оно будет обработано handler1 и handler3
	await broker.publish(routing_key="logs.info", exchange=exchange)

	# Отправляем сообщение в наш exchange
	# Ключ сообщения подпадает под *.debug и logs.*
	# Оно будет обработано handler2 и handler3
	await broker.publish(routing_key="logs.debug", exchange=exchange)

Headers Exchange

Headers Exchange - самый сложный и гибкий способ маршрутизации сообщений в RabbitMQ. Данный тип exchange отправляет сообщения в очереди в соответствии с совпадением аргументов привязки этих очередей к exchange с заголовками сообщений.

Это настолько гибкий вариант маршрутизации, что на практике почти не используется. Почти всегда ему предпочитают Topic Exchange, однако, знание того, что такой инструмент есть, может выручить, когда не хватает базовых приемов.

Примеров его использования в Python (да и вообще) вы тоже практически не найдете. Поэтому, в свое время, мне пришлось разбираться с ним по официальной документации RabbitMQ и прикидывать, как его использовать, по исходникам pika.

Вот вам выжимка:

from propan import PropanApp, RabbitBroker
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType

broker = RabbitBroker()
app = PropanApp(broker)

# Объявляем Headers Exchange
exch = RabbitExchange("exchange", type=ExchangeType.HEADERS)

# Объявляем очередь, которая ожидает сообщения
# с заголовком {"key": 1}
queue_1 = RabbitQueue(
	"test-queue-1",
	bind_arguments={ "key": 1 }
)

# Объявляем очередь, которая ожидает сообщения
# с заголовками {"key": 2}, {"key2": 2} или {"key": 2, "key2": 2}
queue_2 = RabbitQueue(
	"test-queue-2",
	bind_arguments={
		"key": 2, "key2": 2,
		"x-match": "any"
	}
)

# Объявляем очередь, которая ожидает сообщения
# с заголовком {"key": 2, "key2": 2}
queue_3 = RabbitQueue(
	"test-queue-3",
	bind_arguments={
		"key": 2, "key2": 2,
		"x-match": "all"
	}
)

# Слушаем queue_1
@broker.handle(queue_1, exch)
async def handler1():
	print("handler1")

# Слушаем queue_2
@broker.handle(queue_2, exch)
async def handler2():
	print("handler2")

# Слушаем queue_3
@broker.handle(queue_3, exch)
async def handler3():
	print("handler3")

@app.after_startup
async def send_messages():
	# Отправляем сообщение с заголовком {"key": 1}
	# Оно будет обработано handler1
	await broker.publish(exchange=exch, headers={ "key": 1 })
	
	# Отправляем сообщение с заголовком {"key": 2}
	# Оно будет обработано handler2
	await broker.publish(exchange=exch, headers={ "key": 2 })
	
	# Отправляем сообщение с заголовком {"key2": 2}
	# Оно также будет обработано handler2
	await broker.publish(exchange=exch, headers={ "key2": 2 })
	
	# Отправляем сообщение с заголовком {"key": 2, "key2": 2}
	# Оно будет обработано handler2 и handler3
	await broker.publish(exchange=exch, headers={
		"key": 2, "key2": 2
	})

Как вы уже догадались, заголовок привязки "x-match" определяет, должны заголовки сообщения совпадать с объявленными частично или полностью.

Дополнительно

Важно знать еще несколько нюансов: при подписке нескольких потребителей на одну очередь, сообщения из этой очереди будут доставляться к ним ПО ОЧЕРЕДИ. Т.е. если у нас два потребителя (1 и 2) на одну очередь, то сообщения они будут получать по схеме
1 - 2 - 1 - 2 - ...
Причем RabbitMQ самостоятельно следит за распределением этих сообщений. Вы просто подключаете дополнительных потребителей, и на них сразу уходит часть нагрузки, что может быть очень полезно для горизонтального масштабирования.

Также RabbitMQ поддерживает еще более сложные варианты маршрутизации, в которых Exchange привязывается к другому Exchange также, как если бы это была очередь. Так вы можете совмещать, например, Topic и Headers Exchange. Но делать это вам я не рекомендую хотя бы по трем причинам:

  • Это не входит в протокол AMQP, а является "фишкой" RabbitMQ, после которой вы с него не уйдете;

  • Это значительно увеличивает нагрузку на сам RabbitMQ (и так не самый быстрый);

  • Вы запутаетесь. Очень тяжело следить за тем, что и как у вас связано, если одна очередь подписана на несколько Exchange, которые подписаны на другие Exchange, которые... Ну вы поняли.

Вместо вывода

В этой статье мы рассмотрели основные типы Exhcange RabbitMQ, а также работу с ними на языке Python с использованием фреймворка Propan. Такого набора минимальных знаний достаточно для написания и сопровождения базовых сервисов с использованием RabbitMQ.

Если эта тема для вас интересна, в следующий раз мы рассмотрим особенности реализации RPC запросов поверх RabbitMQ без создания временных очередей для получения сообщений.

Полезные ссылки

Комментарии (4)


  1. gto
    27.06.2023 21:33

    Вы меня извините, конечно, но заголовок статье вообще не соответствует. Возможности раббита не ограничиваются типами обменников.


    1. Propan671 Автор
      27.06.2023 21:33
      +1

      Возможно, вы правы: заголовок выбран не совсем удачно. Цель статьи - дать минимально необходимый набор информации, которой хватит для работы с RabbiMQ для людей, которые с ним вообще не знакомы. Как показывает практика, если грузить таких людей всей возможной информацией (либо даже упоминанием всего, что есть, но мы это не рассматриваем), то воспринимается это тяжело. Поэтому лучше сделать вид, что этой информации не существует (на данном этапе) а потом постепенно углублять полученные знания. В принципе, поэтому я и постарался оставить ссылками материалы, которые считаю полезными в качестве следующего шага для углубления знаний.

      Если говорить о небольшом цикле статей, как вы считаете, какие механизмы стоит осветить следующими? Я думал об ack/nack/reject, туда же message ttl и dead letter exchanges/queues, либо написать о direct-reply, т.к. информации об этом механизме, как и примеров кода в интернете почти нет.


  1. e_butcher
    27.06.2023 21:33

    Спасибо за статью!

    Поправьте, пожалуйста, в третьем примере (Topic Exchange) в broker.bublish exchange=exchange, а не exch.

    И в четвертом примере (Headers Exchange) немного резануло logger.info вместо print в третьем хендлере (логично везде делать либо логирование, либо принты).


    1. Propan671 Автор
      27.06.2023 21:33

      Спасибо за замечание, поправил)