Привет, Хабр!

Сегодня рассмотрим, как построить Kafka‑консьюмер, который не падёт при первой же проблеме, а аккуратно сложит битые события в Dead Letter Queue (DLQ).


Когда и зачем нужен DLQ

В Kafka жизненно важно различать две плоскости:

Плоскость

Что происходит без DLQ

Что хотим видеть

Обработка

Консьюмер читает сообщение, попытка deserialise → enrich → persist; при эксепшене offset не коммитится, поток стопорится.

Консьюмер коммитит оффсет, а проблемное событие перекидывает в DLQ‑топик.

Хранение

Сообщение остаётся в основном топике и будет «досаждать» ретраями до конца ретеншена.

Чётко знаем: «в DLQ лежат только фэйлы, прод стрим идёт дальше».

Типовые сценарии

Ситуация

Повторяемая?

Действие

Не‑валидный JSON / Avro‑схема

Fatal

Сразу в DLQ, смысла ретраить нет.

Временная недоступность БД

Retryable

DLQ с отложенным re‑consume после таймаута.

Новая версия схемы (schema registry lag)

Retryable

Либо автоматический retry, либо DLQ → пересчитай позже.

Confluent и ряд энтузиастов приводят такую практику: разделяйте ошибки на fatal и retryable, метите их в header (error.class, error.stacktrace, retryable=true/false).

Где теряются сообщения, если DLQ нет?

  1. Консьюмер зависает на ядовитом сообщении, оффсеты не коммитятся, всё после него не читается.

  2. DevOps убивает pod — Kubernetes рестартует, — ситуация повторяется.

  3. Через N часов приходит ваш SRE и задаёт вопрос: «Почему в топике lag 50 млн, а бизнес‑процесс мёртв?»

Реализация DLQ на примере Kafka Consumer

Python (confluent-kafka)

from confluent_kafka import Consumer, Producer, KafkaException
import json, logging, time

# Базовый конфиг
common = {
    "bootstrap.servers": "kafka-prod:9092",
    "group.id":          "enricher-v1",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
}

consumer = Consumer(common | {"key.deserializer": str})
producer = Producer({"bootstrap.servers": common["bootstrap.servers"]})

SOURCE_TOPIC = "clicks.raw"
DLQ_TOPIC    = f"{SOURCE_TOPIC}.dlq"

def push_dlq(msg, exc, retryable: bool):
    """Проксируем оригинальное сообщение в DLQ c расширенными headers."""
    headers = msg.headers() or []
    headers.extend([
        ("error.class", str(type(exc)).encode()),
        ("error.message", str(exc).encode()),
        ("retryable", b"1" if retryable else b"0"),
        ("ts.failed", str(int(time.time() * 1000)).encode()),
    ])
    producer.produce(
        topic=DLQ_TOPIC,
        key=msg.key(),
        value=msg.value(),
        headers=headers,
    )
    producer.flush(1_000)

def handle(msg):
    payload = json.loads(msg.value())           # может кинуть JSONDecodeError
    enriched = call_some_db(payload["user_id"]) # может кинуть DBError
    publish_downstream(enriched)

while True:
    batch = consumer.consume(num_messages=500, timeout=1.0)
    for m in batch:
        try:
            handle(m)
            consumer.commit(m)  # ручной коммит ⇒ at-least-once
        except json.JSONDecodeError as je:
            logging.exception("Bad JSON")
            push_dlq(m, je, retryable=False)
            consumer.commit(m)
        except TransientDBError as te:
            logging.warning("DB temp issue → DLQ for later retry")
            push_dlq(m, te, retryable=True)
            consumer.commit(m)
        except Exception as e:
            # last-line defense
            push_dlq(m, e, retryable=False)
            consumer.commit(m)

Коммит оффсета после пуша в DLQ — иначе мы бы «застряли» на плохом событии. retryable header — пригодится автоматическму ретраю. producer.flush() — в проде заменяем на асинхронный delivery callback + back‑pressure.

Java (Spring Kafka ≥ 2.8)

У Spring всё из коробки благодаря DefaultErrorHandler и DeadLetterPublishingRecoverer:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvt> kafkaFactory(
        ConsumerFactory<String, OrderEvt> cf,
        KafkaOperations<String, OrderEvt> dlqTemplate) {

    var recoverer = new DeadLetterPublishingRecoverer(
        dlqTemplate,
        (rec, ex) -> new TopicPartition(rec.topic() + ".dlq", rec.partition())
    );

    var errorHandler = new DefaultErrorHandler(
        recoverer,
        new FixedBackOff(0L, 0) // сразу в DLQ, без ретраев
    );

    errorHandler.addNotRetryableExceptions(JsonParseException.class);
    errorHandler.addRetryableExceptions(SQLException.class);

    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvt>();
    factory.setConsumerFactory(cf);
    factory.setCommonErrorHandler(errorHandler);
    return factory;
}

@KafkaListener(topics = "orders.raw", groupId = "enricher-v1")
public void onEvent(OrderEvt evt) {
    // enrich & persist
}

DeadLetterPublishingRecoverer автоматически копирует key/value и доклеивает метадату (kafka_dlt-original-topic, kafka_dlt-exception-fqcn, & др.). Для временных сбоев можно поставить FixedBackOff(1000L, 3) — три локальных ретрая до DLQ.

Spring‑team подтянула эти фичи с Apache Kafka 2.8; до этого приходилось писать кастомные SeekToCurrentErrorHandler.

Как мониторить и обрабатывать DLQ

Метрики и алёрты

Что смотрим

Где берём

Порог

records.in для *.dlq

kafka.server.BrokerTopicMetrics.MessagesInPerSec

>10% от основного топика (5-мин. окно)

Lag основного consumer‑group

kafka.consumer:type=consumer-fetch-manager-metricsrecords-lag-max

рост экспоненциальный

Кол‑во retryable=true в DLQ

ksqlDB / Kafka Streams window count

>1000 за 15 м

Пример алерт‑правила в Prometheus:

- alert: HighDLQInRate
  expr: rate(kafka_server_brokertopicmetrics_messages_in_total{topic=~".*\\.dlq"}[5m]) > 100
  for: 2m
  labels:
    severity: critical
  annotations:
    summary: "DLQ inflow is {{ $value }} msg/s"
    description: "Too many errors hitting DLQ topics"

UI для ручного ревью

kcat (kafkacat) — быстрый просмотр одного сообщения:

kcat -C -t clicks.raw.dlq -o -5 -q

AKHQ / Redpanda Console / Confluent UI — визуальный поиск по key/headers. Когда сообщений десятки тысяч, делают «DLQ‑WorkBench»: удобное SPA, где можно фильтровать по error.class, делать bulk‑reprocess.

Автоматический retry-pipeline

Архитектурный паттерн таков:

clicks.raw.dlq (-- only retryable -->) clicks.retry
         |                                   |
         | DLQ-retry-consumer                | primary-consumer
         +---------->  clicks.raw -----------+

Retry‑consumer читает только retryable=true. Ставит задержку (например, sleep(300_000) или Scheduled Executor). Пушит обратно в исходный топик с новым header x-retry-count. При превышении MAX_RETRY отправляет в clicks.raw.dlq.permanent — это уже зона ручного разбора.

В 2024-м Confluent добавила готовый компонент — Parallel Consumer с built‑in retry — но он пока в tech‑preview.

Итог

DLQ — это не просто «корзинка для битых сообщений», а фундамент отказоустойчивой стриминговой архитектуры:

  • Коммит оффсета → живой консьюмер.

  • Разметка ошибок → осмысленные ретраи.

  • Метрики → SRE спит спокойно.

Собрали? Тестируем — швыряем кривой JSON и дропаем коннект к БД. Консьюмер улыбается, а в DLQ аккуратно появляется две записи: одна retryable=false, вторая retryable=true. Красота.


В заключение приглашаем на открытый урок 22 мая «Оптимизация Nginx и Angie под высокие нагрузки». Узнайте, как настроить ключевые параметры для стабильной работы серверов при большом трафике, оптимизировать TLS, кэширование и анализировать производительность. Меньше узких мест — больше скорости. Записывайтесь по ссылке.

Любое развитие начинается с честной оценки. Пройдите тест на знание инфраструктуры высоконагруженных систем — он подскажет, куда расти дальше.

Комментарии (0)