Привет, Хабр!
Сегодня рассмотрим, почему отставание у Kafka‑консьюмеров — это не просто строчка в kafka-consumer-groups
, а метрика, от которой зависит SLA вашего сервиса. Рассмотрим, как её считать без самообмана, как соорудить собственный мониторинг на Python и Go, а главное — чем именно тушить всплески lag»а: throttle, autoscale и backpressure.
Как считать lag правильно и почему offset ≠ задержка
Слово «lag» используют лениво в двух разных смыслах.
По количеству сообщений. Классическая формула: latest_offset – committed_offset
. latest_offset
— крайний смещённый офсет каждого partition»a на брокере; committed_offset
— то, что консьюмер группа уже зафиксировала в __consumer_offsets
. Но тот же CLI kafka-consumer-groups.sh --describe
выводит ещё current_offset
— номер последнего прочитанного (но не обязательно закоммиченного) сообщения. Многие путают их и получают «плавающий» lag.
По времени. Когда бизнесу важна реальная задержка доставки, считают: now() – timestamp(последнего прочитанного сообщения)
. Это показательно на топиках с batch‑продюсерами, где сообщения пачками пуляются раз в N секунд. Time‑lag хорош тем, что уровень нагрузки выражается в секундах и понятен продактам, но требует тянуть таймстемпы событий.
Разница между committed, latest и current offset
latest хранит брокер, он увеличивается всегда.
current живёт в памяти конкретного консьюмера и обновляется сразу после
poll()
.committed попадает в
__consumer_offsets
, когда вы вызвалиcommitAsync()
/commitSync()
или это сделала framework‑обвязка.
Если группа упала до коммита — current убежит вперёд, committed останется старым, а CLI покажет аномальный всплеск lag»а. Именно поэтому производственные метрики считают по committed, а в коде полезно держать gauge и для current, чтобы ловить «разрывы».
kafka-consumer-groups vs метрики в коде
CLI‑скрипт прекрасен для ад‑хока, но запускается долго, опрашивает брокеры последовательно и нагружает зоопарк кучи RPC. По факту удобнее:
JMX‑метрики
records-lag
/records-lag-max
прямо из клиента;Prometheus‑экспортеры (
kafka-lag-exporter
,kafka_exporter
,Burrow
). Они собирают offset»ы батчами и кэшируют.
Простая формула realtime-lag в коде
lag = latest_offset - committed_offset # сообщений
time_lag_ms = int(time.time()*1000) - last_ts # миллисекунд
Считать надо для каждой пары <topic, partition>
, потом суммировать поверх партиций группы.
Реализация кастомного lag-мониторинга на Python и Go
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, ListOffsetsRequest, ListOffsetsResult
from prometheus_client import Gauge, start_http_server
BROKERS = "kafka-broker-1:9092,kafka-broker-2:9092"
TOPIC = "payments"
GROUP = "billing-service"
lag_gauge = Gauge('kafka_consumer_lag',
'Lag per {topic,partition}',
['topic', 'partition'])
admin = AdminClient({'bootstrap.servers': BROKERS})
coordinator = admin.list_consumer_groups().result()[GROUP]
def calc_partition_lag(tp):
committed = admin.list_consumer_group_offsets(GROUP,
partitions=[tp]).result()[tp].offset
latest = admin.list_offsets({tp: ListOffsetsRequest.LATEST}).result()[tp].offset
return latest - committed
for p in admin.list_topics(TOPIC).topics[TOPIC].partitions:
tp = (TOPIC, p)
lag = calc_partition_lag(tp)
lag_gauge.labels(TOPIC, p).set(lag)
Скрипт запускается как side‑car, открывает /metrics
, и Prometheus подтягивает гейдж раз в 10 секунд.
requirements
# requirements.txt
confluent-kafka~=2.5.1 # ≥ 2.5, фикс CVE-2024-02xx, поддержка ListOffsetsRequest.LATEST
prometheus-client~=0.20.0 # последняя стабильная на апрель 2025
Go
package main
import (
"context"
"github.com/segmentio/kafka-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"net/http"
)
var (
lag = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_consumer_lag",
Help: "Lag per topic/partition",
},
[]string{"topic", "partition"},
)
)
func main() {
prometheus.MustRegister(lag)
conn, _ := kafka.Dial("tcp", "kafka-broker-1:9092")
defer conn.Close()
partitions, _ := conn.ReadPartitions("payments")
for _, p := range partitions {
latest, _ := conn.ReadLastOffset(p.Topic, p.ID)
committed, _ := conn.ReadCommittedOffset(
kafka.GroupOffset{
Group: "billing-service",
Topic: p.Topic,
Partition: p.ID,
})
lag.WithLabelValues(p.Topic, strconv.Itoa(p.ID)).
Set(float64(latest - committed))
}
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(":2112", nil))
}
gomod
// go.mod
module github.com/you/kafka-lag-exporter
go 1.22
require (
github.com/segmentio/kafka-go v0.5.5 // ≥ 0.5 — ReadCommittedOffset переименован
github.com/prometheus/client_golang v1.18.0
)
Плюс — чистый standard lib + promclient, минус — нет встроенного кеша offset»ов, поэтому таймауты и batch‑poll целиком на вас.
Построение панели
Prometheus → Grafana — самый короткий путь: sum(kafka_consumer_lag)
на графике, alert на > 1000
со срабатыванием ≤ 1 минуты.
Simple UI — FastAPI + HTMX отрисовывает таблицу лагов, обновляя дифф через SSE, неплохо заходит в разработке, когда Grafana ещё недоступна.
Поддержка партиций реализуется банально: цикл по list_topics()
и асинхронные list_offsets
/OffsetFetch
. Главное — не склеивать всё в один RPC, иначе брокер отдаст 50×1 000 партиций и упрётся в сетевой MTU.
Как реагировать на рост lag: throttle, scale, backpressure
Автоматизация реакций
Пороговый alert в Grafana стучит в PagerDuty, а параллельно метрика попадает в Kubernetes‑кластер, где KEDA дергает HorizontalPodAutoscaler. Скейл‑фактор пропорционален lag»у: каждые N сообщений добавляют под.
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: billing-consumer
spec:
scaleTargetRef:
name: billing-consumer
minReplicaCount: 1
maxReplicaCount: 10
triggers:
- type: kafka
metadata:
bootstrapServers: kafka-broker-1:9092
consumerGroup: billing-service
topic: payments
lagThreshold: "500"
Черновой, но рабочий пример: как только суммарный lag переваливает 500, KEDA масштабирует deployment.
Throttle
Часто вы ограничены числом разделов или лицензиями Confluent Cloud, и скейлить некуда. Тогда:
while True:
batch = consumer.poll(timeout_ms=100, max_records=100)
process(batch)
if lag_gauge.get() > 5000:
time.sleep(0.2) # мягкий back-off
Наглядно, но важно: sleep держите маленьким (мс 200–500), иначе консьюмер выпадет из rebalance‑протокола и сломает группу.
Backpressure через pause/resume
Для Java‑клиента у вас есть consumer.pause(partitions)
и resume(partitions)
. Они позволяют остановить приём новых сообщений, продолжая poll()
и не давая группе ребаланситься. Реализуйте счётчик in‑flight задач, достигли high‑water‑mark — вызывайте pause
. Закончили — resume
.
В реактивных обвязках (Project Reactor‑Kafka, Spring Kafka) pause/resume уже завёрнуты, но не забывайте, что прямой вызов KafkaConsumer.pause()
без ведома контейнера ломает контракт и после ребаланса partition возобновится сам.
Внешние очереди
Если бизнес‑сервису тяжело обрабатывать пики, проще буферизовать в отдельной системной очереди — Redis Streams, RabbitMQ или тот же PostgreSQL. Kafka‑консьюмер превращается в своебразный перекачивающий насос, а пользовательский воркер читает из очереди с контролируемой скоростью. Конфигурация чуть сложнее, зато lag на Kafka держится плоским, а «просадка» уходит в дешёвое дисковое хранилище.
Выводы
Отставание консьюмеров — это не просто цифра в CLI. Правильное измерение требует понимания трёх офсетов и временных таймстемпов, а стабильность достигается комбинацией:
Тонких метрик и быстрых алертов.
Гибкого автоскейлинга с порогами на lag.
Локального throttling»а и pause/resume при всплесках.
Архитектурного буфера, когда нагрузка принципиально «взрывная».
Если вы сталкиваетесь с проблемами интеграции и управления данными в микросервисах или API, то знакомы с тем, как ошибки и сложности могут возникать из-за гибкости JSON. Schema Registry решает эти проблемы, обеспечивая структуру и стандартизацию данных, что критически важно для масштабируемости и надежности.
Как выбрать между JSON и Schema Registry, когда каждый подход уместен, и как внедрить Schema Registry в своих проектах для улучшения поддержки и совместимости данных? Поговорим об этом на открытом уроке 19 мая.
Максимум практики по работе с Kafka для инженеров данных и разработчиков можно получить на онлайн-курсе "Apache Kafka".
M0rdecay
Про самое интересное и не рассказали - как измерить АКТУАЛЬНОЕ отставание по времени. Дельта между now() и таймстэмпом сообщения не показательна, потому что когда по этой дельте мы переваливаем за SLO, то уже поздно пить боржоми.
Для себя вывел подход вычислять реальное отставание через лаг в штуках, деленный на скорость обработки за интервал. Не идеально, но подсвечивает проблему заранее.
Ну и уже традиционное замечание по kafka-go - незачем городить свой велосипед для вычисления лага - у ридера есть стата с этой метрикой. А для консьюмер групп всё равно нужно запускать по ридеру на партицию, тогда метрика всегда будет корректная.