Привет, Хабр!

Сегодня разберём один из самых гибких инструментов в RabbitMQ — topic exchange. Именно он позволяет не просто отправить сообщение «куда-то», а превратить очередь в маршрутизатор уровня BGP, но только внутри твоей системы.

Что такое topic exchange и в каких архитектурах он нужен

Прежде чем начнём, зафиксируем один фундаментальный принцип: topic exchange — это шаблонный маршрутизатор. Он позволяет описать, кому доставить сообщение, используя гибкие шаблоны ключей, основанные на структуре из точек (.).

Пример ключа:

invoice.created.eu.highvalue

Т.е есть по фатку вместо того чтобы писать условие if region == "eu" and amount > 10000, ты просто биндишь очередь на шаблон:

invoice.created.eu.*

И все, логика переезжает из кода в инфраструктуру.

topic exchange полезен, когда вы строите микросервисную архитектуру с логической маршрутизацией, работаете в мультитенантной среде с разграничением по tenant ID в ключе, настраиваете CI/CD-пайплайны с шаблонными задачами вроде ci.build.java или ci.test.python, реализуете расширяемую систему алертов (alert.#) или ведёте распределённое логгирование (*.error.*). Главный плюс — масштабируемость: добавляйте новые сервисы, меняйте схему маршрутизации, не трогая код отправителей — всё управляется через биндинги.

Синтаксис и семантика

topic exchange работает с двумя спецсимволами:

  • * — один сегмент (одно слово)

  • # — ноль или больше сегментов

Важно помнить:

routing key

matched by

a.b.c

..*, a.*.c, a.#

order.created.us.ny

order.*.us.*, #.ny

ci.test.go

ci.*.go, ci.#

Некоторые нюансы

  1. # не может стоять внутри: a.#.bнедопустимо

  2. заменяет только один сегмент, не два: a.b

  3. routing key должен быть не пустым и без пробелов

Паттерн проектирования ключей

Один из лучших способов описания структуры routing key — это hierarchical domain model, где каждый сегмент описывает контекст:

<domain>.<event>.<scope>.<detail>

Примеры:

  • auth.login.success.user123

  • payment.failed.eu.customer456

  • order.created.us.ny

Такую структуру можно потом биндингами резать как хочешь:

  • auth.*.success.* — ловим успех в авторизации

  • *.failed.# — ловим любые фейлы

  • order.created.# — всё про создание заказов

Пример применения

Допустим, есть платформа, разрезанная на микросервисы. Каждый сервис отвечает за свой бизнес-контекст:

  • auth — аутентификация и авторизация

  • orders — оформление и управление заказами

  • payment — платежи и возвраты

  • notifications — обработка писем и push-уведомлений

Вся система должна быть:

  • отказоустойчивой

  • наблюдаемой

  • расширяемой без редеплоя продюсеров

Проектируем событие в виде routing key, по следующей схеме:

<domain>.<event>.<level>.<region>.<tenant>

Примеры ключей:

auth.login.failed.error.eu.tenant42
payment.refund.processed.info.us.tenant17
orders.create.success.info.ru.tenant01
notifications.email.bounced.warn.eu.tenant42

Требования к системе доставки:

  1. Все события .error..*.* -> очередь critical-ops (обработка ошибок, инциденты)

  2. Все события из payment -> очередь finance-pipeline

  3. Все bounced уведомления ->очередь email-troubles

  4. Все события по tenant42 -> очередь audit-tenant42

  5. Все успешные события .success.info..* -> очередь success-feed для ML-аналитики

Конфигурация exchange и биндингов

channel.exchange_declare(
    exchange="events_topic",
    exchange_type="topic",
    durable=True
)

Биндинги:

Очередь

Binding key

critical-ops

..error.*.*

finance-pipeline

payment.#

email-troubles

notifications.email.bounced.#

audit-tenant42

#.*.*.*.tenant42

success-feed

..success.info.*.*

Каждый из этих биндингов позволяет получать события по семантике, не переписывая отправителей. Добавили новый домен — биндим новую очередь. Расширили теги — не трогаем старый код.

Отправка событий

Реализация отправителя producer.py:

import pika

def send_event(routing_key: str, body: str):
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()

    channel.exchange_declare(exchange="events_topic", exchange_type="topic", durable=True)

    channel.basic_publish(
        exchange="events_topic",
        routing_key=routing_key,
        body=body.encode("utf-8"),
        properties=pika.BasicProperties(
            delivery_mode=2  # persistent
        )
    )

    print(f"Sent [{routing_key}]: {body}")
    connection.close()

# Пример отправки события
send_event("auth.login.failed.error.eu.tenant42", "Login failed from IP 1.2.3.4")

Обработка сообщений с ack, QoS и ручной контролем

Реализация обработчика ошибок (consumer_critical.py):

import pika

def handle_critical(ch, method, properties, body):
    print(f"[CRITICAL] {method.routing_key}: {body.decode()}")
    # логика обработки, отправка в Slack, логгинг, мониторинг
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.queue_declare(queue="critical-ops", durable=True)

channel.queue_bind(
    exchange="events_topic",
    queue="critical-ops",
    routing_key="*.*.error.*.*"
)

channel.basic_qos(prefetch_count=5)
channel.basic_consume(queue="critical-ops", on_message_callback=handle_critical)

print("Waiting for critical events...")
channel.start_consuming()

prefetch_count=5 — позволяет контролировать, чтобы воркер не захлебнулся, если сообщение тяжёлое. basic_ack — обязательно нужен, чтобы сообщение не считалось обработанным, пока оно реально не пройдено. delivery_mode=2 — сообщение переживёт рестарт брокера. Очередь durable=True — не потеряет биндинги и сообщения при сбоях.

Поведение системы при множестве биндингов

Если событие с ключом payment.refund.failed.error.us.tenant42 отправлено:

  • попадёт в critical-ops (совпадает с ..error.*.*)

  • попадёт в finance-pipeline (совпадает с payment.#)

  • попадёт в audit-tenant42 (совпадает с #.*.*.*.tenant42)

Одно и то же сообщение будет скопировано RabbitMQ в несколько очередей по биндингу. Т.е продюсер делает один publish, а exchange делает мультикаст по правилам.

Как тестировать и отлаживать

Чтобы убедиться, что всё работает:

channel.queue_declare(queue="", exclusive=True)
channel.queue_bind(
    exchange="events_topic",
    queue=result.method.queue,
    routing_key="#"
)

Временно создаём анонимную очередь с шаблоном #, чтобы получить все события из exchange, и логируем каждый routing key вместе с телом сообщения и совпавшими биндингами.

По итогу построили систему, где:

  • routing key — это контракт маршрутизации

  • topic exchange — центр управления логикой

  • очереди — реализуют семантическую подписку без кода

  • продюсеры не зависят от инфраструктуры подписчиков

Подход сам по себе масштабируем и гибок.


Заключение

topic exchange — проверенный инструмент для построения маршрутизации событий в распределённых системах.

Если вы уже применяете topic exchange в своих проектах — расскажите об этом в комментариях: какие схемы вы проектировали, где ошибались, какие шаблоны оказались самыми удобными? И, конечно, в RabbitMQ есть и другие мощные механизмы: headers exchange для матчей по метаданным, consistent-hash exchange для балансировки, dead letter exchange для надёжной обработки ошибок и alternate exchange как fallback-механизм. Всё это может быть объединено в продуманную архитектуру, если понять, где что работает лучше.


Если вы работаете с микросервисами, интеграциями или хотите глубже понять, как управлять потоками сообщений — приглашаем вас на два открытых урока:

8 июля в 20:00 — «RabbitMQ: как заставить сообщения летать по сложным маршрутам». Разберёмся, как использовать topic exchange для гибкой маршрутизации событий в распределённых системах.

23 июля в 20:00 — «Оптимальные решения на RabbitMQ: или как Кролик превосходит Kafka». Обсудим реальные сценарии, в которых RabbitMQ оказывается предпочтительнее Kafka, и покажем подходы к архитектуре с учётом производительности и надёжности.

Также доступен короткий тест к курсу «RabbitMQ для разработчиков и администраторов», который поможет оценить, насколько уверенно вы ориентируетесь в возможностях брокера и где стоит углубить знания.

Комментарии (0)