Практика outbox, гарантии доставки, DLX и схемы, которые реально спасают прод

Ситуация. REST-вызовы между сервисами начали валить друг друга: цепочки запросов, таймауты, каскадные фейлы. Мы перевели коммуникацию на RabbitMQ – и добились не «безошибочности», а предсказуемой деградации: данные не теряются, критичное – едет первым, яд быстрее восстанавливается

Ниже – только прикладное: проверенные паттерны, готовые фрагменты кода (Python / aio-pika), и схемы, которые можно сразу встраивать

База: «деградировать с честью» – это как?

Принцип простой: если брокер, потребители или сеть «сыпятся», система не ломает бизнес-операции и не теряет событий, а:

  • Временно буферизует

  • Расставляет приоритеты

  • Отбивает яд от «ядовитых» сообщений

  • Восстанавливается до согласованного состояния

В RabbitMQ это собирается из четырёх кирпичей:

  1. Transactional Outbox (события в своей БД до брокера)

  2. Безопасная публикация (publisher confirms + mandatory / AE)

  3. Безопасное потребление (ручные ack / nack, DLX, идемпотентность)

  4. Контроль нагрузки (prefetch, TTL, max-length, приоритеты / разделение очередей)

Шаг 1. Transactional Outbox – никаких потерь при падении брокера

Суть: вместе с записью бизнес-события пишем строку в outbox в той же транзакции. Фоновый воркер публикует в RabbitMQ с подтверждениями; при сбоях – ретраит. Событие не «исчезнет» между БД и брокером

Архитектура с Outbox, AE и DLX
Архитектура с Outbox, AE и DLX

DDL (PostgreSQL):

create table outbox (
  id            uuid primary key,
  aggregate     text not null,
  event_type    text not null,
  payload       jsonb not null,
  created_at    timestamptz not null default now(),
  status        text not null default 'NEW',      -- NEW|SENT|FAILED
  attempts      int  not null default 0,
  next_retry_at timestamptz
);

create index on outbox (status, next_retry_at);

Воркер публикации (Python, aio-pika, подтверждения + mandatory):

import asyncio, json, uuid, asyncpg
from aio_pika import connect_robust, Message, DeliveryMode, RobustConnection, RobustChannel, ExchangeType
from datetime import datetime, timedelta

AMQP_URL = "amqp://user:pass@rabbit:5672/%2F"
EXCHANGE = "events.topic"

async def publish_outbox_batch(pool):
    conn: RobustConnection = await connect_robust(AMQP_URL)
    ch: RobustChannel = await conn.channel(publisher_confirms=True)
    exchange = await ch.declare_exchange(EXCHANGE, ExchangeType.TOPIC, durable=True,
                                         arguments={"alternate-exchange": "events.ae"})  # AE на случай unroutable
    while True:
        async with pool.acquire() as db:
            rows = await db.fetch("""
              update outbox
                 set status='IN_PROGRESS'
               where id in (
                 select id from outbox
                  where status='NEW'
                     or (status='FAILED' and (next_retry_at is null or next_retry_at <= now()))
                  order by created_at
                  limit 200
               )
             returning id, event_type, payload
            """)
        if not rows:
            await asyncio.sleep(0.3)
            continue

        to_ack, to_fail = [], []
        for r in rows:
            body = json.dumps(r["payload"]).encode()
            msg = Message(
                body,
                delivery_mode=DeliveryMode.PERSISTENT,
                message_id=str(uuid.uuid4()),
                content_type="application/json",
                headers={"event_type": r["event_type"]},
            )
            routing_key = f"{r['event_type']}"
            try:
                # mandatory=True - UnroutableError, если некуда доставить
                await exchange.publish(msg, routing_key=routing_key, mandatory=True)
                to_ack.append(r["id"])
            except Exception:
                to_fail.append(r["id"])

        async with pool.acquire() as db:
            if to_ack:
                await db.execute("update outbox set status='SENT' where id = any($1)", to_ack)
            if to_fail:
                await db.execute("""
                  update outbox
                     set status='FAILED',
                         attempts = attempts + 1,
                         next_retry_at = now() + ((least(attempts, 10)) * interval '10 seconds')
                   where id = any($1)
                """, to_fail)

async def main():
    pool = await asyncpg.create_pool(dsn="postgres://user:pass@db/app")
    await publish_outbox_batch(pool)

asyncio.run(main())

Что важно:

  • publisher_confirms=True – ждём подтверждение брокера

  • mandatory=True + alternate-exchange (AE) – гарантируем, что ни одно сообщение не уйдёт в «вакуум»

  • Экспоненциальный backoff по attemptsне душим брокер при восстановлении

Шаг 2. Публикация с гарантиями: confirms, mandatory, AE

  • Publisher Confirms: брокер подтверждает приём (пер-сообщение или батчем)

  • mandatory: если ни одна привязка не подходит, брокер вернёт сообщение (или отправит в AE)

  • Alternate Exchange (AE): запасной обменник для unroutable – складываем туда и алертим

Последовательность публикации
Последовательность публикации

Топология (объявление обменников и AE):

# основной обменник событий
await ch.declare_exchange("events.topic", ExchangeType.TOPIC, durable=True,
                          arguments={"alternate-exchange": "events.ae"})
# AE для не маршрутизованных
await ch.declare_exchange("events.ae", ExchangeType.FANOUT, durable=True)
# очередь-парковка для unroutable
q_ae = await ch.declare_queue("events.unroutable", durable=True)
await q_ae.bind("events.ae", routing_key="")

Шаг 3. Потребление без сюрпризов: ack / nack, DLX, идемпотентность

Цели:

  • Не теряем сообщения

  • Не зацикливаем «ядовитые»

  • Обрабатываем идемпотентно (дубликаты возможны)

Очередь с DLX / TTL / ограничением:

# Dead-letter exchange
await ch.declare_exchange("events.dlx", ExchangeType.TOPIC, durable=True)

args = {
    "x-dead-letter-exchange": "events.dlx",
    "x-dead-letter-routing-key": "dead.order",   # или по шаблону
    "x-message-ttl": 10_000,                     # TTL сообщений в очереди (пример)
    "x-max-length": 50_000,                      # защитимся от бесконечного роста
    "x-queue-mode": "lazy",                      # крупные очереди - на диск
    "x-max-priority": 10,                        # если используете приоритеты
}

q = await ch.declare_queue("order.events", durable=True, arguments=args)
await q.bind("events.topic", routing_key="order.*")

Идемпотентный consumer (с учётом дубликатов и «ядовитых»): как не потерять сообщение при сбое

import hashlib, aioredis

redis = await aioredis.from_url("redis://redis:6379/0")

def seen_key(message_id: str) -> str:
    return f"seen:{message_id}"

async def handle(message):
    # бизнес-логика
    ...

async def consume(message):
    async with message.process(ignore_processed=True):  # manual ack / nack внутри контекста
        mid = message.message_id or hashlib.sha1(message.body).hexdigest()

        # идемпотентность: уже обрабатывали?
        if await redis.setnx(seen_key(mid), "1"):
            await redis.expire(seen_key(mid), 24*3600)  # хранить сутки / неделю - по объёму
            try:
                await handle(message)
                # ack произойдёт при выходе из контекста
            except TransientError:
                # временная ошибка - вернём в очередь (с учётом TTL / DLX / повторных попыток)
                await message.nack(requeue=True)
            except Exception:
                # яд - в DLX, чтобы не крутить бесконечно
                await message.reject(requeue=False)
        else:
            # дубликат - подтверждаем тихо
            return

await q.consume(consume, no_ack=False)
await ch.set_qos(prefetch_count=64)  # контроль нагрузки

Зачем DLX: «ядовитые» сообщения автоматически попадают в «кладбище» (parking lot). Там их можно руками разбирать или переигрывать через отдельный «ре-процессор»

Потребление с идемпотентностью и DLX
Потребление с идемпотентностью и DLX

Шаг 4. Приоритеты и разделение путей: критичное едет первым

Частая ошибка – «одна очередь на всё». В результате уведомления забивают полосу, а деньги / заказы ждут

Правильно:

  • Разделяем критичное и фоновое по разным очередям / ключам

  • Для фонового – TTL + max-length (готовы терять «хвост» при аварии)

  • Для критичного – отдельные воркеры и меньший prefetch (быстрее реакция)

Пример:

# критичная очередь (оплата)
await ch.declare_queue("billing.high", durable=True,
                       arguments={"x-dead-letter-exchange": "events.dlx"})
await ch.queue_bind("billing.high", "events.topic", routing_key="billing.*")

# фоновая (email): TTL + max-length, пусть деградирует первой
await ch.declare_queue("notify.low", durable=True,
  arguments={
    "x-dead-letter-exchange": "events.dlx",
    "x-message-ttl": 60_000,
    "x-max-length": 20_000
})
await ch.queue_bind("notify.low", "events.topic", routing_key="notify.*")

Набор «граблей» и как мы их обходили

  • Большие очереди = антипаттерн. Делайте короткими: x-message-ttl, x-max-length, «ленивые» (x-queue-mode=lazy) только там, где осознанно готовы к диску

  • Дубликаты неизбежны. Проектируйте идемпотентно (натуральные операции / UPSERT, ключи, registry processed ids)

  • Один пользователь RabbitMQ для всех сервисов – плохо: лишает видимости и контроля. Делите доступы

  • guest / guest в проде – никогда

  • prefetch по умолчанию душит латентность: подберите set_qos экспериментально (обычно 16–256)

  • Нет AE / mandatory – «чёрная дыра»

  • Нет DLX – «ядовитые» крутятся бесконечно

Чек-лист внедрения

  1. Включить Transactional Outbox в сервисах-продюсерах

  2. Публиковать с publisher confirms + mandatory=true, настроить AE

  3. Для каждой критичной линии – своя очередь, для фоновой – TTL / ограничения

  4. Подписчики: ручные ack / nack, DLX, идемпотентность

  5. Настроить prefetch, health-пробы, авто-reconnect

  6. Метрики: ready / unacked, publish / ack rate, глубина DLQ, x-death

  7. Runbook: как чистить DLQ, как «ре-проигрывать» из парковки

Итог

RabbitMQ сам по себе не «лечит» систему. Но с правильной топологией (Outbox – Confirms + AE – Idempotent Consumers + DLX – Контроль нагрузки) вы получаете контролируемую деградацию: критичные события приходят, фоновые отваливаются первыми, «ядовитые» безопасно паркуются, а данные – не теряются. Это и есть «честная» отказоустойчивость в реальной жизни

Комментарии (3)


  1. Grand_piano
    02.09.2025 12:54

    Ничего не понятно, но очень интересно.