Привет, Хабр!

Сегодня рассмотрим, почему отставание у Kafka‑консьюмеров — это не просто строчка в kafka-consumer-groups, а метрика, от которой зависит SLA вашего сервиса. Рассмотрим, как её считать без самообмана, как соорудить собственный мониторинг на Python и Go, а главное — чем именно тушить всплески lag»а: throttle, autoscale и backpressure.


Как считать lag правильно и почему offset ≠ задержка

Слово «lag» используют лениво в двух разных смыслах.

По количеству сообщений. Классическая формула: latest_offset – committed_offset. latest_offset — крайний смещённый офсет каждого partition»a на брокере; committed_offset — то, что консьюмер группа уже зафиксировала в __consumer_offsets. Но тот же CLI kafka-consumer-groups.sh --describe выводит ещё current_offset — номер последнего прочитанного (но не обязательно закоммиченного) сообщения. Многие путают их и получают «плавающий» lag.

По времени. Когда бизнесу важна реальная задержка доставки, считают: now() – timestamp(последнего прочитанного сообщения). Это показательно на топиках с batch‑продюсерами, где сообщения пачками пуляются раз в N секунд. Time‑lag хорош тем, что уровень нагрузки выражается в секундах и понятен продактам, но требует тянуть таймстемпы событий.

Разница между committed, latest и current offset

  • latest хранит брокер, он увеличивается всегда.

  • current живёт в памяти конкретного консьюмера и обновляется сразу после poll().

  • committed попадает в __consumer_offsets, когда вы вызвали commitAsync()/commitSync() или это сделала framework‑обвязка.

Если группа упала до коммита — current убежит вперёд, committed останется старым, а CLI покажет аномальный всплеск lag»а. Именно поэтому производственные метрики считают по committed, а в коде полезно держать gauge и для current, чтобы ловить «разрывы».

kafka-consumer-groups vs метрики в коде

CLI‑скрипт прекрасен для ад‑хока, но запускается долго, опрашивает брокеры последовательно и нагружает зоопарк кучи RPC. По факту удобнее:

  • JMX‑метрики records-lag/records-lag-max прямо из клиента;

  • Prometheus‑экспортеры (kafka-lag-exporter, kafka_exporter, Burrow). Они собирают offset»ы батчами и кэшируют.

Простая формула realtime-lag в коде

lag = latest_offset - committed_offset          # сообщений
time_lag_ms = int(time.time()*1000) - last_ts   # миллисекунд

Считать надо для каждой пары <topic, partition>, потом суммировать поверх партиций группы.

Реализация кастомного lag-мониторинга на Python и Go

from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, ListOffsetsRequest, ListOffsetsResult
from prometheus_client import Gauge, start_http_server

BROKERS = "kafka-broker-1:9092,kafka-broker-2:9092"
TOPIC   = "payments"
GROUP   = "billing-service"

lag_gauge = Gauge('kafka_consumer_lag',
                  'Lag per {topic,partition}',
                  ['topic', 'partition'])

admin = AdminClient({'bootstrap.servers': BROKERS})
coordinator = admin.list_consumer_groups().result()[GROUP]

def calc_partition_lag(tp):
    committed = admin.list_consumer_group_offsets(GROUP,
        partitions=[tp]).result()[tp].offset
    latest = admin.list_offsets({tp: ListOffsetsRequest.LATEST}).result()[tp].offset
    return latest - committed

for p in admin.list_topics(TOPIC).topics[TOPIC].partitions:
    tp = (TOPIC, p)
    lag = calc_partition_lag(tp)
    lag_gauge.labels(TOPIC, p).set(lag)

Скрипт запускается как side‑car, открывает /metrics, и Prometheus подтягивает гейдж раз в 10 секунд.

requirements
# requirements.txt
confluent-kafka~=2.5.1      # ≥ 2.5, фикс CVE-2024-02xx, поддержка ListOffsetsRequest.LATEST
prometheus-client~=0.20.0   # последняя стабильная на апрель 2025

Go

package main

import (
	"context"
	"github.com/segmentio/kafka-go"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"log"
	"net/http"
)

var (
	lag = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "kafka_consumer_lag",
			Help: "Lag per topic/partition",
		},
		[]string{"topic", "partition"},
	)
)

func main() {
	prometheus.MustRegister(lag)

	conn, _ := kafka.Dial("tcp", "kafka-broker-1:9092")
	defer conn.Close()

	partitions, _ := conn.ReadPartitions("payments")
	for _, p := range partitions {
		latest, _ := conn.ReadLastOffset(p.Topic, p.ID)
		committed, _ := conn.ReadCommittedOffset(
			kafka.GroupOffset{
				Group:     "billing-service",
				Topic:     p.Topic,
				Partition: p.ID,
			})
		lag.WithLabelValues(p.Topic, strconv.Itoa(p.ID)).
			Set(float64(latest - committed))
	}

	http.Handle("/metrics", promhttp.Handler())
	log.Fatal(http.ListenAndServe(":2112", nil))
}
gomod
// go.mod
module github.com/you/kafka-lag-exporter

go 1.22

require (
    github.com/segmentio/kafka-go v0.5.5   // ≥ 0.5 — ReadCommittedOffset переименован
    github.com/prometheus/client_golang v1.18.0
)

Плюс — чистый standard lib + promclient, минус — нет встроенного кеша offset»ов, поэтому таймауты и batch‑poll целиком на вас.

Построение панели

Prometheus → Grafana — самый короткий путь: sum(kafka_consumer_lag) на графике, alert на > 1000 со срабатыванием ≤ 1 минуты.

Simple UI — FastAPI + HTMX отрисовывает таблицу лагов, обновляя дифф через SSE, неплохо заходит в разработке, когда Grafana ещё недоступна.

Поддержка партиций реализуется банально: цикл по list_topics() и асинхронные list_offsets/OffsetFetch. Главное — не склеивать всё в один RPC, иначе брокер отдаст 50×1 000 партиций и упрётся в сетевой MTU.

Как реагировать на рост lag: throttle, scale, backpressure

Автоматизация реакций

Пороговый alert в Grafana стучит в PagerDuty, а параллельно метрика попадает в Kubernetes‑кластер, где KEDA дергает HorizontalPodAutoscaler. Скейл‑фактор пропорционален lag»у: каждые N сообщений добавляют под.

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: billing-consumer
spec:
  scaleTargetRef:
    name: billing-consumer
  minReplicaCount: 1
  maxReplicaCount: 10
  triggers:
  - type: kafka
    metadata:
      bootstrapServers: kafka-broker-1:9092
      consumerGroup: billing-service
      topic: payments
      lagThreshold: "500"

Черновой, но рабочий пример: как только суммарный lag переваливает 500, KEDA масштабирует deployment.

Throttle

Часто вы ограничены числом разделов или лицензиями Confluent Cloud, и скейлить некуда. Тогда:

while True:
    batch = consumer.poll(timeout_ms=100, max_records=100)
    process(batch)
    if lag_gauge.get() > 5000:
        time.sleep(0.2)   # мягкий back-off

Наглядно, но важно: sleep держите маленьким (мс 200–500), иначе консьюмер выпадет из rebalance‑протокола и сломает группу.

Backpressure через pause/resume

Для Java‑клиента у вас есть consumer.pause(partitions) и resume(partitions). Они позволяют остановить приём новых сообщений, продолжая poll() и не давая группе ребаланситься. Реализуйте счётчик in‑flight задач, достигли high‑water‑mark — вызывайте pause. Закончили — resume.

В реактивных обвязках (Project Reactor‑Kafka, Spring Kafka) pause/resume уже завёрнуты, но не забывайте, что прямой вызов KafkaConsumer.pause() без ведома контейнера ломает контракт и после ребаланса partition возобновится сам.

Внешние очереди

Если бизнес‑сервису тяжело обрабатывать пики, проще буферизовать в отдельной системной очереди — Redis Streams, RabbitMQ или тот же PostgreSQL. Kafka‑консьюмер превращается в своебразный перекачивающий насос, а пользовательский воркер читает из очереди с контролируемой скоростью. Конфигурация чуть сложнее, зато lag на Kafka держится плоским, а «просадка» уходит в дешёвое дисковое хранилище.


Выводы

Отставание консьюмеров — это не просто цифра в CLI. Правильное измерение требует понимания трёх офсетов и временных таймстемпов, а стабильность достигается комбинацией:

  1. Тонких метрик и быстрых алертов.

  2. Гибкого автоскейлинга с порогами на lag.

  3. Локального throttling»а и pause/resume при всплесках.

  4. Архитектурного буфера, когда нагрузка принципиально «взрывная».


Если вы сталкиваетесь с проблемами интеграции и управления данными в микросервисах или API, то знакомы с тем, как ошибки и сложности могут возникать из-за гибкости JSON. Schema Registry решает эти проблемы, обеспечивая структуру и стандартизацию данных, что критически важно для масштабируемости и надежности.

Как выбрать между JSON и Schema Registry, когда каждый подход уместен, и как внедрить Schema Registry в своих проектах для улучшения поддержки и совместимости данных? Поговорим об этом на открытом уроке 19 мая.

Максимум практики по работе с Kafka для инженеров данных и разработчиков можно получить на онлайн-курсе "Apache Kafka".

Комментарии (1)


  1. M0rdecay
    15.05.2025 11:15

    Про самое интересное и не рассказали - как измерить АКТУАЛЬНОЕ отставание по времени. Дельта между now() и таймстэмпом сообщения не показательна, потому что когда по этой дельте мы переваливаем за SLO, то уже поздно пить боржоми.

    Для себя вывел подход вычислять реальное отставание через лаг в штуках, деленный на скорость обработки за интервал. Не идеально, но подсвечивает проблему заранее.

    Ну и уже традиционное замечание по kafka-go - незачем городить свой велосипед для вычисления лага - у ридера есть стата с этой метрикой. А для консьюмер групп всё равно нужно запускать по ридеру на партицию, тогда метрика всегда будет корректная.