Привет, Хабр!
Сегодня разберём один из самых гибких инструментов в RabbitMQ — topic exchange
. Именно он позволяет не просто отправить сообщение «куда-то», а превратить очередь в маршрутизатор уровня BGP, но только внутри твоей системы.
Что такое topic exchange и в каких архитектурах он нужен
Прежде чем начнём, зафиксируем один фундаментальный принцип: topic exchange
— это шаблонный маршрутизатор. Он позволяет описать, кому доставить сообщение, используя гибкие шаблоны ключей, основанные на структуре из точек (.
).
Пример ключа:
invoice.created.eu.highvalue
Т.е есть по фатку вместо того чтобы писать условие if region == "eu" and amount > 10000
, ты просто биндишь очередь на шаблон:
invoice.created.eu.*
И все, логика переезжает из кода в инфраструктуру.
topic exchange
полезен, когда вы строите микросервисную архитектуру с логической маршрутизацией, работаете в мультитенантной среде с разграничением по tenant ID в ключе, настраиваете CI/CD-пайплайны с шаблонными задачами вроде ci.build.java
или ci.test.python
, реализуете расширяемую систему алертов (alert.#
) или ведёте распределённое логгирование (*.error.*
). Главный плюс — масштабируемость: добавляйте новые сервисы, меняйте схему маршрутизации, не трогая код отправителей — всё управляется через биндинги.
Синтаксис и семантика
topic exchange
работает с двумя спецсимволами:
*
— один сегмент (одно слово)#
— ноль или больше сегментов
Важно помнить:
routing key |
matched by |
---|---|
|
|
|
|
|
|
Некоторые нюансы
#
не может стоять внутри:a.#.b
— недопустимозаменяет только один сегмент, не два:
a.b
≠routing key должен быть не пустым и без пробелов
Паттерн проектирования ключей
Один из лучших способов описания структуры routing key — это hierarchical domain model, где каждый сегмент описывает контекст:
<domain>.<event>.<scope>.<detail>
Примеры:
auth.login.success.user123
payment.failed.eu.customer456
order.created.us.ny
Такую структуру можно потом биндингами резать как хочешь:
auth.*.success.*
— ловим успех в авторизации*.failed.#
— ловим любые фейлыorder.created.#
— всё про создание заказов
Пример применения
Допустим, есть платформа, разрезанная на микросервисы. Каждый сервис отвечает за свой бизнес-контекст:
auth
— аутентификация и авторизацияorders
— оформление и управление заказамиpayment
— платежи и возвратыnotifications
— обработка писем и push-уведомлений
Вся система должна быть:
отказоустойчивой
наблюдаемой
расширяемой без редеплоя продюсеров
Проектируем событие в виде routing key, по следующей схеме:
<domain>.<event>.<level>.<region>.<tenant>
Примеры ключей:
auth.login.failed.error.eu.tenant42
payment.refund.processed.info.us.tenant17
orders.create.success.info.ru.tenant01
notifications.email.bounced.warn.eu.tenant42
Требования к системе доставки:
Все события
.error..*.*
-> очередьcritical-ops
(обработка ошибок, инциденты)Все события из
payment
-> очередьfinance-pipeline
Все
bounced
уведомления ->очередьemail-troubles
Все события по
tenant42
-> очередьaudit-tenant42
Все успешные события
.success.info..*
-> очередьsuccess-feed
для ML-аналитики
Конфигурация exchange и биндингов
channel.exchange_declare(
exchange="events_topic",
exchange_type="topic",
durable=True
)
Биндинги:
Очередь |
Binding key |
---|---|
|
|
|
|
|
|
|
|
|
|
Каждый из этих биндингов позволяет получать события по семантике, не переписывая отправителей. Добавили новый домен — биндим новую очередь. Расширили теги — не трогаем старый код.
Отправка событий
Реализация отправителя producer.py
:
import pika
def send_event(routing_key: str, body: str):
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.exchange_declare(exchange="events_topic", exchange_type="topic", durable=True)
channel.basic_publish(
exchange="events_topic",
routing_key=routing_key,
body=body.encode("utf-8"),
properties=pika.BasicProperties(
delivery_mode=2 # persistent
)
)
print(f"Sent [{routing_key}]: {body}")
connection.close()
# Пример отправки события
send_event("auth.login.failed.error.eu.tenant42", "Login failed from IP 1.2.3.4")
Обработка сообщений с ack, QoS и ручной контролем
Реализация обработчика ошибок (consumer_critical.py
):
import pika
def handle_critical(ch, method, properties, body):
print(f"[CRITICAL] {method.routing_key}: {body.decode()}")
# логика обработки, отправка в Slack, логгинг, мониторинг
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="critical-ops", durable=True)
channel.queue_bind(
exchange="events_topic",
queue="critical-ops",
routing_key="*.*.error.*.*"
)
channel.basic_qos(prefetch_count=5)
channel.basic_consume(queue="critical-ops", on_message_callback=handle_critical)
print("Waiting for critical events...")
channel.start_consuming()
prefetch_count=5
— позволяет контролировать, чтобы воркер не захлебнулся, если сообщение тяжёлое. basic_ack
— обязательно нужен, чтобы сообщение не считалось обработанным, пока оно реально не пройдено. delivery_mode=2
— сообщение переживёт рестарт брокера. Очередь durable=True
— не потеряет биндинги и сообщения при сбоях.
Поведение системы при множестве биндингов
Если событие с ключом payment.refund.failed.error.us.tenant42
отправлено:
попадёт в
critical-ops
(совпадает с..error.*.*
)попадёт в
finance-pipeline
(совпадает сpayment.#
)попадёт в
audit-tenant42
(совпадает с#.*.*.*.tenant42
)
Одно и то же сообщение будет скопировано RabbitMQ в несколько очередей по биндингу. Т.е продюсер делает один publish, а exchange делает мультикаст по правилам.
Как тестировать и отлаживать
Чтобы убедиться, что всё работает:
channel.queue_declare(queue="", exclusive=True)
channel.queue_bind(
exchange="events_topic",
queue=result.method.queue,
routing_key="#"
)
Временно создаём анонимную очередь с шаблоном #
, чтобы получить все события из exchange, и логируем каждый routing key вместе с телом сообщения и совпавшими биндингами.
По итогу построили систему, где:
routing key — это контракт маршрутизации
topic exchange — центр управления логикой
очереди — реализуют семантическую подписку без кода
продюсеры не зависят от инфраструктуры подписчиков
Подход сам по себе масштабируем и гибок.
Заключение
topic exchange
— проверенный инструмент для построения маршрутизации событий в распределённых системах.
Если вы уже применяете topic exchange
в своих проектах — расскажите об этом в комментариях: какие схемы вы проектировали, где ошибались, какие шаблоны оказались самыми удобными? И, конечно, в RabbitMQ есть и другие мощные механизмы: headers exchange
для матчей по метаданным, consistent-hash exchange
для балансировки, dead letter exchange
для надёжной обработки ошибок и alternate exchange
как fallback-механизм. Всё это может быть объединено в продуманную архитектуру, если понять, где что работает лучше.
Если вы работаете с микросервисами, интеграциями или хотите глубже понять, как управлять потоками сообщений — приглашаем вас на два открытых урока:
8 июля в 20:00 — «RabbitMQ: как заставить сообщения летать по сложным маршрутам». Разберёмся, как использовать topic exchange для гибкой маршрутизации событий в распределённых системах.
23 июля в 20:00 — «Оптимальные решения на RabbitMQ: или как Кролик превосходит Kafka». Обсудим реальные сценарии, в которых RabbitMQ оказывается предпочтительнее Kafka, и покажем подходы к архитектуре с учётом производительности и надёжности.
Также доступен короткий тест к курсу «RabbitMQ для разработчиков и администраторов», который поможет оценить, насколько уверенно вы ориентируетесь в возможностях брокера и где стоит углубить знания.