Практика outbox, гарантии доставки, DLX и схемы, которые реально спасают прод
Ситуация. REST-вызовы между сервисами начали валить друг друга: цепочки запросов, таймауты, каскадные фейлы. Мы перевели коммуникацию на RabbitMQ – и добились не «безошибочности», а предсказуемой деградации: данные не теряются, критичное – едет первым, яд быстрее восстанавливается
Ниже – только прикладное: проверенные паттерны, готовые фрагменты кода (Python / aio-pika), и схемы, которые можно сразу встраивать
База: «деградировать с честью» – это как?
Принцип простой: если брокер, потребители или сеть «сыпятся», система не ломает бизнес-операции и не теряет событий, а:
Временно буферизует
Расставляет приоритеты
Отбивает яд от «ядовитых» сообщений
Восстанавливается до согласованного состояния
В RabbitMQ это собирается из четырёх кирпичей:
Transactional Outbox (события в своей БД до брокера)
Безопасная публикация (publisher confirms + mandatory / AE)
Безопасное потребление (ручные ack / nack, DLX, идемпотентность)
Контроль нагрузки (prefetch, TTL, max-length, приоритеты / разделение очередей)
Шаг 1. Transactional Outbox – никаких потерь при падении брокера
Суть: вместе с записью бизнес-события пишем строку в outbox
в той же транзакции. Фоновый воркер публикует в RabbitMQ с подтверждениями; при сбоях – ретраит. Событие не «исчезнет» между БД и брокером

DDL (PostgreSQL):
create table outbox (
id uuid primary key,
aggregate text not null,
event_type text not null,
payload jsonb not null,
created_at timestamptz not null default now(),
status text not null default 'NEW', -- NEW|SENT|FAILED
attempts int not null default 0,
next_retry_at timestamptz
);
create index on outbox (status, next_retry_at);
Воркер публикации (Python, aio-pika, подтверждения + mandatory):
import asyncio, json, uuid, asyncpg
from aio_pika import connect_robust, Message, DeliveryMode, RobustConnection, RobustChannel, ExchangeType
from datetime import datetime, timedelta
AMQP_URL = "amqp://user:pass@rabbit:5672/%2F"
EXCHANGE = "events.topic"
async def publish_outbox_batch(pool):
conn: RobustConnection = await connect_robust(AMQP_URL)
ch: RobustChannel = await conn.channel(publisher_confirms=True)
exchange = await ch.declare_exchange(EXCHANGE, ExchangeType.TOPIC, durable=True,
arguments={"alternate-exchange": "events.ae"}) # AE на случай unroutable
while True:
async with pool.acquire() as db:
rows = await db.fetch("""
update outbox
set status='IN_PROGRESS'
where id in (
select id from outbox
where status='NEW'
or (status='FAILED' and (next_retry_at is null or next_retry_at <= now()))
order by created_at
limit 200
)
returning id, event_type, payload
""")
if not rows:
await asyncio.sleep(0.3)
continue
to_ack, to_fail = [], []
for r in rows:
body = json.dumps(r["payload"]).encode()
msg = Message(
body,
delivery_mode=DeliveryMode.PERSISTENT,
message_id=str(uuid.uuid4()),
content_type="application/json",
headers={"event_type": r["event_type"]},
)
routing_key = f"{r['event_type']}"
try:
# mandatory=True - UnroutableError, если некуда доставить
await exchange.publish(msg, routing_key=routing_key, mandatory=True)
to_ack.append(r["id"])
except Exception:
to_fail.append(r["id"])
async with pool.acquire() as db:
if to_ack:
await db.execute("update outbox set status='SENT' where id = any($1)", to_ack)
if to_fail:
await db.execute("""
update outbox
set status='FAILED',
attempts = attempts + 1,
next_retry_at = now() + ((least(attempts, 10)) * interval '10 seconds')
where id = any($1)
""", to_fail)
async def main():
pool = await asyncpg.create_pool(dsn="postgres://user:pass@db/app")
await publish_outbox_batch(pool)
asyncio.run(main())
Что важно:
publisher_confirms=True
– ждём подтверждение брокераmandatory=True
+ alternate-exchange (AE) – гарантируем, что ни одно сообщение не уйдёт в «вакуум»Экспоненциальный backoff по
attempts
– не душим брокер при восстановлении
Шаг 2. Публикация с гарантиями: confirms, mandatory, AE
Publisher Confirms: брокер подтверждает приём (пер-сообщение или батчем)
mandatory: если ни одна привязка не подходит, брокер вернёт сообщение (или отправит в AE)
Alternate Exchange (AE): запасной обменник для unroutable – складываем туда и алертим

Топология (объявление обменников и AE):
# основной обменник событий
await ch.declare_exchange("events.topic", ExchangeType.TOPIC, durable=True,
arguments={"alternate-exchange": "events.ae"})
# AE для не маршрутизованных
await ch.declare_exchange("events.ae", ExchangeType.FANOUT, durable=True)
# очередь-парковка для unroutable
q_ae = await ch.declare_queue("events.unroutable", durable=True)
await q_ae.bind("events.ae", routing_key="")
Шаг 3. Потребление без сюрпризов: ack / nack, DLX, идемпотентность
Цели:
Не теряем сообщения
Не зацикливаем «ядовитые»
Обрабатываем идемпотентно (дубликаты возможны)
Очередь с DLX / TTL / ограничением:
# Dead-letter exchange
await ch.declare_exchange("events.dlx", ExchangeType.TOPIC, durable=True)
args = {
"x-dead-letter-exchange": "events.dlx",
"x-dead-letter-routing-key": "dead.order", # или по шаблону
"x-message-ttl": 10_000, # TTL сообщений в очереди (пример)
"x-max-length": 50_000, # защитимся от бесконечного роста
"x-queue-mode": "lazy", # крупные очереди - на диск
"x-max-priority": 10, # если используете приоритеты
}
q = await ch.declare_queue("order.events", durable=True, arguments=args)
await q.bind("events.topic", routing_key="order.*")
Идемпотентный consumer (с учётом дубликатов и «ядовитых»): как не потерять сообщение при сбое
import hashlib, aioredis
redis = await aioredis.from_url("redis://redis:6379/0")
def seen_key(message_id: str) -> str:
return f"seen:{message_id}"
async def handle(message):
# бизнес-логика
...
async def consume(message):
async with message.process(ignore_processed=True): # manual ack / nack внутри контекста
mid = message.message_id or hashlib.sha1(message.body).hexdigest()
# идемпотентность: уже обрабатывали?
if await redis.setnx(seen_key(mid), "1"):
await redis.expire(seen_key(mid), 24*3600) # хранить сутки / неделю - по объёму
try:
await handle(message)
# ack произойдёт при выходе из контекста
except TransientError:
# временная ошибка - вернём в очередь (с учётом TTL / DLX / повторных попыток)
await message.nack(requeue=True)
except Exception:
# яд - в DLX, чтобы не крутить бесконечно
await message.reject(requeue=False)
else:
# дубликат - подтверждаем тихо
return
await q.consume(consume, no_ack=False)
await ch.set_qos(prefetch_count=64) # контроль нагрузки
Зачем DLX: «ядовитые» сообщения автоматически попадают в «кладбище» (parking lot). Там их можно руками разбирать или переигрывать через отдельный «ре-процессор»

Шаг 4. Приоритеты и разделение путей: критичное едет первым
Частая ошибка – «одна очередь на всё». В результате уведомления забивают полосу, а деньги / заказы ждут
Правильно:
Разделяем критичное и фоновое по разным очередям / ключам
Для фонового – TTL + max-length (готовы терять «хвост» при аварии)
Для критичного – отдельные воркеры и меньший prefetch (быстрее реакция)
Пример:
# критичная очередь (оплата)
await ch.declare_queue("billing.high", durable=True,
arguments={"x-dead-letter-exchange": "events.dlx"})
await ch.queue_bind("billing.high", "events.topic", routing_key="billing.*")
# фоновая (email): TTL + max-length, пусть деградирует первой
await ch.declare_queue("notify.low", durable=True,
arguments={
"x-dead-letter-exchange": "events.dlx",
"x-message-ttl": 60_000,
"x-max-length": 20_000
})
await ch.queue_bind("notify.low", "events.topic", routing_key="notify.*")
Набор «граблей» и как мы их обходили
Большие очереди = антипаттерн. Делайте короткими:
x-message-ttl
,x-max-length
, «ленивые» (x-queue-mode=lazy
) только там, где осознанно готовы к дискуДубликаты неизбежны. Проектируйте идемпотентно (натуральные операции / UPSERT, ключи, registry processed ids)
Один пользователь RabbitMQ для всех сервисов – плохо: лишает видимости и контроля. Делите доступы
guest / guest в проде – никогда
prefetch по умолчанию душит латентность: подберите
set_qos
экспериментально (обычно 16–256)Нет AE / mandatory – «чёрная дыра»
Нет DLX – «ядовитые» крутятся бесконечно
Чек-лист внедрения
Включить Transactional Outbox в сервисах-продюсерах
Публиковать с publisher confirms +
mandatory=true
, настроить AEДля каждой критичной линии – своя очередь, для фоновой – TTL / ограничения
Подписчики: ручные ack / nack, DLX, идемпотентность
Настроить
prefetch
, health-пробы, авто-reconnectМетрики: ready / unacked, publish / ack rate, глубина DLQ, x-death
Runbook: как чистить DLQ, как «ре-проигрывать» из парковки
Итог
RabbitMQ сам по себе не «лечит» систему. Но с правильной топологией (Outbox – Confirms + AE – Idempotent Consumers + DLX – Контроль нагрузки) вы получаете контролируемую деградацию: критичные события приходят, фоновые отваливаются первыми, «ядовитые» безопасно паркуются, а данные – не теряются. Это и есть «честная» отказоустойчивость в реальной жизни
Grand_piano
Ничего не понятно, но очень интересно.