Привет, Хабр!
Сегодня рассмотрим, как построить Kafka‑консьюмер, который не падёт при первой же проблеме, а аккуратно сложит битые события в Dead Letter Queue (DLQ).
Когда и зачем нужен DLQ
В Kafka жизненно важно различать две плоскости:
Плоскость |
Что происходит без DLQ |
Что хотим видеть |
---|---|---|
Обработка |
Консьюмер читает сообщение, попытка |
Консьюмер коммитит оффсет, а проблемное событие перекидывает в DLQ‑топик. |
Хранение |
Сообщение остаётся в основном топике и будет «досаждать» ретраями до конца ретеншена. |
Чётко знаем: «в DLQ лежат только фэйлы, прод стрим идёт дальше». |
Типовые сценарии
Ситуация |
Повторяемая? |
Действие |
---|---|---|
Не‑валидный JSON / Avro‑схема |
Fatal |
Сразу в DLQ, смысла ретраить нет. |
Временная недоступность БД |
Retryable |
DLQ с отложенным re‑consume после таймаута. |
Новая версия схемы (schema registry lag) |
Retryable |
Либо автоматический retry, либо DLQ → пересчитай позже. |
Confluent и ряд энтузиастов приводят такую практику: разделяйте ошибки на fatal и retryable, метите их в header (error.class
, error.stacktrace
, retryable=true/false
).
Где теряются сообщения, если DLQ нет?
Консьюмер зависает на ядовитом сообщении, оффсеты не коммитятся, всё после него не читается.
DevOps убивает pod — Kubernetes рестартует, — ситуация повторяется.
Через N часов приходит ваш SRE и задаёт вопрос: «Почему в топике lag 50 млн, а бизнес‑процесс мёртв?»
Реализация DLQ на примере Kafka Consumer
Python (confluent-kafka)
from confluent_kafka import Consumer, Producer, KafkaException
import json, logging, time
# Базовый конфиг
common = {
"bootstrap.servers": "kafka-prod:9092",
"group.id": "enricher-v1",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
}
consumer = Consumer(common | {"key.deserializer": str})
producer = Producer({"bootstrap.servers": common["bootstrap.servers"]})
SOURCE_TOPIC = "clicks.raw"
DLQ_TOPIC = f"{SOURCE_TOPIC}.dlq"
def push_dlq(msg, exc, retryable: bool):
"""Проксируем оригинальное сообщение в DLQ c расширенными headers."""
headers = msg.headers() or []
headers.extend([
("error.class", str(type(exc)).encode()),
("error.message", str(exc).encode()),
("retryable", b"1" if retryable else b"0"),
("ts.failed", str(int(time.time() * 1000)).encode()),
])
producer.produce(
topic=DLQ_TOPIC,
key=msg.key(),
value=msg.value(),
headers=headers,
)
producer.flush(1_000)
def handle(msg):
payload = json.loads(msg.value()) # может кинуть JSONDecodeError
enriched = call_some_db(payload["user_id"]) # может кинуть DBError
publish_downstream(enriched)
while True:
batch = consumer.consume(num_messages=500, timeout=1.0)
for m in batch:
try:
handle(m)
consumer.commit(m) # ручной коммит ⇒ at-least-once
except json.JSONDecodeError as je:
logging.exception("Bad JSON")
push_dlq(m, je, retryable=False)
consumer.commit(m)
except TransientDBError as te:
logging.warning("DB temp issue → DLQ for later retry")
push_dlq(m, te, retryable=True)
consumer.commit(m)
except Exception as e:
# last-line defense
push_dlq(m, e, retryable=False)
consumer.commit(m)
Коммит оффсета после пуша в DLQ — иначе мы бы «застряли» на плохом событии. retryable header — пригодится автоматическму ретраю. producer.flush()
— в проде заменяем на асинхронный delivery callback + back‑pressure.
Java (Spring Kafka ≥ 2.8)
У Spring всё из коробки благодаря DefaultErrorHandler
и DeadLetterPublishingRecoverer
:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvt> kafkaFactory(
ConsumerFactory<String, OrderEvt> cf,
KafkaOperations<String, OrderEvt> dlqTemplate) {
var recoverer = new DeadLetterPublishingRecoverer(
dlqTemplate,
(rec, ex) -> new TopicPartition(rec.topic() + ".dlq", rec.partition())
);
var errorHandler = new DefaultErrorHandler(
recoverer,
new FixedBackOff(0L, 0) // сразу в DLQ, без ретраев
);
errorHandler.addNotRetryableExceptions(JsonParseException.class);
errorHandler.addRetryableExceptions(SQLException.class);
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvt>();
factory.setConsumerFactory(cf);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
@KafkaListener(topics = "orders.raw", groupId = "enricher-v1")
public void onEvent(OrderEvt evt) {
// enrich & persist
}
DeadLetterPublishingRecoverer
автоматически копирует key/value и доклеивает метадату (kafka_dlt-original-topic
, kafka_dlt-exception-fqcn
, & др.). Для временных сбоев можно поставить FixedBackOff(1000L, 3)
— три локальных ретрая до DLQ.
Spring‑team подтянула эти фичи с Apache Kafka 2.8; до этого приходилось писать кастомные SeekToCurrentErrorHandler
.
Как мониторить и обрабатывать DLQ
Метрики и алёрты
Что смотрим |
Где берём |
Порог |
---|---|---|
|
|
>10% от основного топика (5-мин. окно) |
Lag основного consumer‑group |
|
рост экспоненциальный |
Кол‑во |
ksqlDB / Kafka Streams window count |
>1000 за 15 м |
Пример алерт‑правила в Prometheus:
- alert: HighDLQInRate
expr: rate(kafka_server_brokertopicmetrics_messages_in_total{topic=~".*\\.dlq"}[5m]) > 100
for: 2m
labels:
severity: critical
annotations:
summary: "DLQ inflow is {{ $value }} msg/s"
description: "Too many errors hitting DLQ topics"
UI для ручного ревью
kcat (kafkacat
) — быстрый просмотр одного сообщения:
kcat -C -t clicks.raw.dlq -o -5 -q
AKHQ / Redpanda Console / Confluent UI — визуальный поиск по key/headers. Когда сообщений десятки тысяч, делают «DLQ‑WorkBench»: удобное SPA, где можно фильтровать по error.class
, делать bulk‑reprocess.
Автоматический retry-pipeline
Архитектурный паттерн таков:
clicks.raw.dlq (-- only retryable -->) clicks.retry
| |
| DLQ-retry-consumer | primary-consumer
+----------> clicks.raw -----------+
Retry‑consumer читает только retryable=true
. Ставит задержку (например, sleep(300_000)
или Scheduled Executor). Пушит обратно в исходный топик с новым header x-retry-count
. При превышении MAX_RETRY
отправляет в clicks.raw.dlq.permanent
— это уже зона ручного разбора.
В 2024-м Confluent добавила готовый компонент — Parallel Consumer с built‑in retry — но он пока в tech‑preview.
Итог
DLQ — это не просто «корзинка для битых сообщений», а фундамент отказоустойчивой стриминговой архитектуры:
Коммит оффсета → живой консьюмер.
Разметка ошибок → осмысленные ретраи.
Метрики → SRE спит спокойно.
Собрали? Тестируем — швыряем кривой JSON и дропаем коннект к БД. Консьюмер улыбается, а в DLQ аккуратно появляется две записи: одна retryable=false
, вторая retryable=true
. Красота.
В заключение приглашаем на открытый урок 22 мая «Оптимизация Nginx и Angie под высокие нагрузки». Узнайте, как настроить ключевые параметры для стабильной работы серверов при большом трафике, оптимизировать TLS, кэширование и анализировать производительность. Меньше узких мест — больше скорости. Записывайтесь по ссылке.
Любое развитие начинается с честной оценки. Пройдите тест на знание инфраструктуры высоконагруженных систем — он подскажет, куда расти дальше.