Привет, Хабр!
Сегодня мы рассмотрим Fast Lane / Slow Lane для Kafka: как одним росчерком кода защитить SLA‑критичный поток от толстых сообщений, не перекраивая пол‑стека и не устраивая зоопарк из очередей.
Kafka читает батчами и строго по порядку. Если впереди в логах стоит гигантский JSON, consumer обязан проглотить его прежде, чем добраться до маленького heartbeat. Лёгкие события застревают, медианное время обработки идёт в космос, SLA горит синим пламенем. Разнести трафик на fast lane и slow lane — самый прямой способ убрать взаимное влияние. Лёгкие события летят в приоритетный топик, тяжёлые отправляются в отдельный, медленный. Теоретически можно пытаться делать приоритизацию внутри одной очереди, но тогда упираемся в порядковую семантику Kafka и получаем latency‑капкан.
Топология на уровне брокера
Создаём два топика с разными настройками. Fast Lane держим с большим количеством партиций и жёстким лимитом на размер сообщения, Slow Lane — с меньшим количеством партиций, но с повышенным message.max.bytes
. Пример Terraform‑модуля:
resource "kafka_topic" "events_fast" {
name = "events.input.fast"
replication_factor = 3
partitions = 12
config = {
"max.message.bytes" = "1048576" # 1 MB
"retention.ms" = "604800000" # 7 дней
}
}
resource "kafka_topic" "events_slow" {
name = "events.input.slow"
replication_factor = 3
partitions = 6
config = {
"max.message.bytes" = "8388608" # 8 MB
"retention.ms" = "259200000" # 3 дня
}
}
Продюсер
Первый вариант это решать по размеру сообщение/события:
@Service
@RequiredArgsConstructor
public class EventRouter {
private final KafkaTemplate<String, byte[]> kafka;
public void send(byte[] payload) {
if (payload.length > 900_000) { // > ~900 КБ
kafka.send("events.input.slow", payload);
} else {
kafka.send("events.input.fast", payload);
}
}
}
Логика размазана по сервисам; если понадобится сложная классификация — придётся менять все продюсеры.
RecordInterceptor: централизуем роутинг
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> props = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap,
ProducerConfig.ACKS_CONFIG, "all",
ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4", // дешёвое сжатие
ProducerConfig.LINGER_MS_CONFIG, 5,
ProducerConfig.BATCH_SIZE_CONFIG, 32_768
);
DefaultKafkaProducerFactory<String, byte[]> factory =
new DefaultKafkaProducerFactory<>(props);
factory.addPostProcessor((producer, tx) -> // подключаем интерцептор
producer.setInterceptor(new RoutingInterceptor()));
return factory;
}
}
public class RoutingInterceptor implements ProducerInterceptor<String, byte[]> {
@Override
public ProducerRecord<String, byte[]> onSend(ProducerRecord<String, byte[]> record) {
byte[] payload = record.value();
if (payload != null && payload.length > 900_000) {
return new ProducerRecord<>("events.input.slow", record.key(), payload);
}
return new ProducerRecord<>("events.input.fast", record.key(), payload);
}
}
Интерцептор прозрачен для бизнес‑кода: сервисы зовут kafkaTemplate.send("events.input", …)
и ничего не знают про дорожки.
Kafka Streams для динамического бранчинга
Когда нужны более хитрые правила, например обогащение события метаданными или ML‑моделью, удобнее взять Kafka Streams:
@Bean
public Topology topology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> source = builder.stream("events.input");
Predicate<String, Event> isSmall =
(k, v) -> v.size() <= 900_000;
Predicate<String, Event> isLarge =
(k, v) -> v.size() > 900_000;
KStream<String, Event>[] branches = source.branch(isSmall, isLarge); // fast / slow
branches[0].to("events.input.fast");
branches[1].to("events.input.slow");
return builder.build();
}
Streams‑процесс на отдельном сервисе — и у продюсеров чистая боль, а правила меняются в одном месте.
Конфигурация consumer’ов
Быстрый контейнер
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> fastFactory(
ConsumerFactory<String, byte[]> base) {
var f = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
f.setConsumerFactory(base);
f.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
f.setBatchListener(true);
f.setConcurrency(6);
// poll меньше 100 мс — реагируем быстро
f.getContainerProperties().setIdleBetweenPolls(50);
// после 1000 рекордов делаем commit
f.setAckOnError(false);
return f;
}
Медленный контейнер
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> slowFactory(
ConsumerFactory<String, byte[]> base) {
var f = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
f.setConsumerFactory(base);
f.setBatchListener(true);
f.setConcurrency(2);
// читаем больше данных за один fetch
Map<String, Object> props = f.getConsumerFactory().getConfigurationProperties();
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 2_097_152); // 2 MB
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 8_388_608); // 8 MB
return f;
}
Параметры fetch.max.bytes
и max.partition.fetch.bytes
: для slow lane разрешаем больше, иначе толстый пакет не пролезет и consumer поймает RecordTooLargeException
.
Слушатели с ручным ack
@Slf4j
@Service
public class FastListener {
@KafkaListener(
id = "fast-listener",
topics = "events.input.fast",
containerFactory = "fastFactory"
)
public void onFast(List<byte[]> messages, Acknowledgment ack) {
messages.forEach(this::processFast);
ack.acknowledge(); // commit offset сразу
}
}
@Slf4j
@Service
public class SlowListener {
@KafkaListener(
id = "slow-listener",
topics = "events.input.slow",
containerFactory = "slowFactory"
)
public void onSlow(List<byte[]> messages, Acknowledgment ack) {
for (byte[] m : messages) {
try {
processSlow(m);
} catch (Exception ex) {
sendToDlq(m, ex);
}
}
ack.acknowledge();
}
}
В slow lane часто нужна Dead Letter Queue. Сразу выделяем events.input.slow.dlq
и не ломаем голову.
Динамический паузинг slow‑консьюмера
Когда fast lane начинает отставать, можно временно остановить slow listener, сохранив heartbeat, чтобы не получить ребаланс. Spring‑Kafka умеет это из коробки.
@Component
@RequiredArgsConstructor
public class SlowLaneThrottler {
private final KafkaListenerEndpointRegistry registry;
// запускаем из Spring‑Scheduler
@Scheduled(fixedDelay = 30_000)
public void controlSlowLane() {
MessageListenerContainer fast = registry.getListenerContainer("fast-listener");
MessageListenerContainer slow = registry.getListenerContainer("slow-listener");
long lagFast = lag("events.input.fast");
boolean overloaded = lagFast > 10_000;
if (overloaded && !slow.isPauseRequested()) {
slow.pause();
log.warn("Slow lane paused, fast lag={}", lagFast);
} else if (!overloaded && slow.isPauseRequested()) {
slow.resume();
log.info("Slow lane resumed");
}
}
}
Метод lag
берёт данные из JMX/Prometheus; реализацию опустим ради краткости.
Dynamic Throttle API
Начиная с Kafka 3.3 можно отдавать broker‑side throttle через Admin API: ограничиваем скорость отдачи для конкретных consumer‑групп. В Spring‑Kafka это делается так:
@Bean
public KafkaAdmin.NewPartitions throttleGroup() {
return (admin) -> admin.alterConsumerGroupOffsets(
"slow-consumer-group",
Map.of(new TopicPartition("events.input.slow", 0),
new OffsetAndMetadata(0L)),
new AlterConsumerGroupOffsetsOptions().timeoutMs(5_000)
.throttle(512 * 1024)); // 512 KiB/s
}
Такая мера включается по алерту и почти не требует остановки приложения.
Метрики и алерты
Подключаем Micrometer: management.metrics.enable.kafka: true
— и получаем пачку готовых метрик. Главное: лейблы client.id
, topic
. Вывешиваем в Grafana два графика:
kafka_consumer_records_lag_max{topic="events.input.fast"}
kafka_consumer_records_lag_max{topic="events.input.slow"}
Держим fast‑lag < 1000, slow‑lag < 100 000. Алерт: если fast‑lag > 10 000 за 5 минут — пауза slow‑консьюмера и Slack‑уведомление.
Некоторые ошибки
Часто промахиваются на этапе тюнинга fast‑lane
: оставляют одинаковое значение max.poll.records
для обоих слушателей, и быстрый consumer захлёбывается, потому что на него ложится объём батча, рассчитанный на slow‑lane
; отсюда лавинообразный рост лагов. Вторая типовая оплошность это отправлять события без сжатия: когда payload приближается к порогу 1 MB, брокер отклоняет запись, а продюсер отвечает ошибкой RecordTooLarge
, хотя проблему решали бы две строчки compression.type=lz4
.
Другая пара ошибок связана с ресурсными лимитами. Если забыть про quota, «медленный» consumer при высоком fetch.max.bytes
идёт быстрее «быстрого» и съедает пропускную способность брокера, сводя на нет всю идею приоритизации. И, наконец, retention для slow‑topic:
день хранения выглядит разумно, пока ночью не прилетит пик крупных событий; если retention меньше фактической волны, самые важные сообщения исчезнут до обработки, и восстановить их будет некуда.
Итог
Если вы уже внедряли подобную схему разделения трафика или пошли другим путём — делитесь опытом в комментариях. Интересно посмотреть и обсудить, как вы решали проблему приоритизации событий в Kafka.
Если вы работаете с высоконагруженными системами и интересуетесь архитектурными подходами, приглашаем вас на два открытых урока курса Highload Architect:
12 августа в 20:00 — «Мониторинг в высоконагруженных проектах»
Разберём, как выстроить наблюдаемость системы под постоянной нагрузкой: какие метрики важны, как быстро выявлять узкие места и реагировать на инциденты до того, как они станут проблемой.20 августа в 20:00 — «Wasm на сервере в высоконагруженных системах»
Поговорим о применении WebAssembly на серверной стороне. Вы узнаете, зачем его интегрируют в высоконагруженные сервисы, какие преимущества даёт песочница и как подход влияет на производительность.Кроме того, вы можете пройти тестирование, чтобы проверить свои знания и навыки в области высоконагруженных систем.