Автор статьи: Сергей Прощаев @sproshchaev
Руководитель направления Java‑разработки в FinTech
Введение
Apache Kafka — это основа современных распределенных систем, обрабатывающий триллионы событий ежедневно. Но что происходит, если сообщение потерялось, пришло дважды или нарушилась логика бизнес‑процесса? Гарантии доставки в Kafka — это страховка от хаоса в условиях высокой нагрузки и сбоев.
В этой статье мы разберем три вида гарантий доставки сообщений на примерах.
Что такое семантика доставки и что такое гарантия доставки в Kafka?
Семантика доставки (Delivery Semantics) и гарантия доставки (Delivery Guarantee) в Apache Kafka связаны, но это не одно и то же.
Семантика доставки (Delivery Semantics) это правила, определяющие, сколько раз сообщение будет доставлено потребителю. Это техническая реализация поведения системы в случае сбоев, повторных отправок или параллельной обработки. И здесь определены три варианта: At‑Most‑Once, At‑Least‑Once, Exactly‑Once:

Гарантия доставки (Delivery Guarantee) — это обещание системы обеспечить определенный уровень надежности. Это результат, который достигается через применение конкретной семантики. Так например гарантия «не потерять данные» реализуется через семантику At‑Least‑Once, а гарантия «точное выполнение операции» требует семантики Exactly‑Once.
Таким образом семантика — это инструмент, который вы настраиваете (например, транзакции Kafka), а гарантия — это результат, который видит бизнес например, «платеж зачислится ровно один раз».
Давайте разберем три варианта семантики доставки более подробно.
1. At‑Most‑Once (не более одного раза): сообщение может быть потеряно, но никогда не дублируется. Продюсер отправляет сообщение без подтверждения. Консьюмер автоматически коммитит офсет до обработки сообщения. Если консьюмер упадёт до обработки, сообщение будет потеряно. Этот вид доставки используется в системах где потеря данных не является критичным — например, метрики, логи).
2. At‑Least‑Once (не менее одного раза): сообщение никогда не теряется, но может быть дублировано. Продюсер включает подтверждения и повторные попытки. Консьюмер вручную коммитит офсет после обработки. В этом случае может возникнуть дублирование сообщений при повторной отправке или сбое. Эту гарантию доставки лучше использовать для систем, где потеря данных недопустима (платежи, заказы).
3. Exactly‑Once (точно один раз): сообщение доставляется ровно один раз, даже при сбоях. Продюсер использует транзакции и идемпотентность. Консьюмер читает только зафиксированные транзакции. В этом случае мы нагружаем Kafka управлением транзакциями и из‑за этого у нас увеличивается время доставки сообщений. Подобную гарантию доставки рекомендуется использовать при построении обработки критичных операций где требуется обработка сообщения строго один раз.
Настройка проекта
Давайте рассмотрим примеры конфигурирования Продюсеров и Консьюмеров для всех трех вариантов гарантий доставки сообщений.
Для минимизации кода будем использовать Java и Spring Framework и подключим две зависимости:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
Не более одного раза (At‑Most‑Once)
Первым настроим гарантию доставки при которой сообщение может быть потеряно, но не дублируется. Для этого нам надо выключить у Продюсера ожидание подтверждения записи сообщения в Kafka и выключить повторы отправки сообщений:
@Configuration
public class ProducerAMO {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "0");
config.put(ProducerConfig.RETRIES_CONFIG, 0);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class ProducerServiceAMO {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
Для Консьюмера нам нужно включить автокоммит офсетов:
@Configuration
public class ConsumerAMO {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-amo");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Service
public class ConsumerService_AMO {
@KafkaListener(topics = "topic-am0", groupId = "group-amo")
public void consume(String message) {
System.out.println("Received (At-Most-Once): " + message);
}
}
Не менее одного раза (At‑Least‑Once)
Вторым примером гарантии доставки будет вариант с доставкой сообщения не менее одного раза. Нам нужно настроить получение подтверждения сообщения Продюсером от всех реплик в кластере и установить повторы отправки сообщений:
@Configuration
public class ProducerALO {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class ProducerServiceALO {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String message) {
kafkaTemplate.send(topic, message).get();
}
}
На стороне Консьюмера нам нужно будет настроить ручной коммит офсетов:
@Configuration
public class ConsumerALO {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-alo");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties()
.setAckMode(ContainerProperties
.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
@Service
public class ConsumerServiceALO {
@KafkaListener(topics = "topic-alo", groupId = "group-alo")
public void consume(ConsumerRecord<String, String> record,
Acknowledgment ack) {
System.out.println("Received: " + record.value());
ack.acknowledge();
}
}
Точно один раз (Exactly‑Once)
Для настройки этого варианта гарантии доставки нам нужно включить режим идемпотентности и использование транзакций у Продюсера:
@Configuration
public class ProducerEO {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-001");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template =
new KafkaTemplate<>(producerFactory());
template.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> record, RecordMetadata metadata) {
// Здесь мы вставим логику обработки успешной отправки
}
});
return template;
}
}
@Service
public class ProducerServiceEO {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String message) {
kafkaTemplate.executeInTransaction(operations -> {
operations.send(topic, message);
return null;
});
}
}
Для Консьюмера нам нужно установить уровень изоляции транзакций read_committed
:
@Configuration
public class ConsumerEO {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-eo");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Service
public class ConsumerServiceEO {
@KafkaListener(topics = "topic-eo", groupId = "group-eo")
public void consume(String message) {
System.out.println("Received (Exactly-Once): " + message);
}
}
Заключение
В этих учебных примерах мы рассмотрели фрагменты конфигурации Продюсера и Консьюмера для трех различных вариантов гарантий сообщений в распределенных системах. Как мы видим, что гарантия доставки At‑Most‑Once, при которой сообщение доставляется не более одного раза, имеет наивысшую скорость, но при этом не гарантируется получение сообщения Коньюмером. Использование At‑Least‑Once позволяет доставить сообщение не менее одного раза, но при этом допускает и наличие дубликатов. А самый точный вариант — Exactly‑Once позволяет Консьюмеру получить сообщение точно один раз, но при этом используется самый медленный и самый сложный вариант доставки.
На практике рекомендуется выбирать наиболее подходящий вариант гарантии доставки для вашей системы, чтобы найти компромисс между скоростью и надежностью.
Если вы работаете с распределёнными системами или планируете использовать Kafka в проектах, приглашаем вас на серию открытых уроков по курсу Apache Kafka:
30 июля в 19:00 — «Apache Kafka в микросервисной архитектуре: лучшие практики асинхронного обмена». Обсудим роль Kafka в микросервисах и подходы к построению устойчивого взаимодействия между сервисами.
13 августа в 18:00 — «Архитектурные паттерны работы с Kafka: от простого к масштабируемому». Разберём архитектурные решения, которые позволяют работать с Kafka надёжно и с учётом роста нагрузки.
20 августа в 20:00 — «Kafka и ClickHouse: как организовать взаимодействие». Рассмотрим интеграцию Kafka и ClickHouse для потоковой аналитики и хранения данных.
Также вы можете пройти вступительное тестирование, чтобы узнать, достаточно ли ваших текущих знаний для обучения на курсе Apache Kafka.
LeshaRB
Здесь кода непонятного, больше чем смысла
Буквально тоже интересовался
https://proselyte.net/kafka-guaranteed-data-delivery/
Больше толку