Приветствую вас, читатели Хабра!

Сегодня в статье разберем, как Kafka обрабатывает оффсеты сообщений и какие существуют стратегии их сохранения и обновления.

Оффсет в Kafka — это числовой идентификатор, который указывает позицию каждого сообщения внутри партиции топика. Оффсеты представляют собой порядковые номера, начинаемые с нуля, и уникальны в рамках каждой партиции, но не между разными партициями. Т.е сообщение с оффсетом 5 в партиции 1 и сообщение с оффсетом 5 в партиции 2 — это разные сообщения.

Как работают оффсеты:

Каждое новое сообщение в партиции получает следующий по порядку оффсет. Например, если последнее сообщение в партиции имело оффсет 10, следующее сообщение получит оффсет 11. Так можно отслеживать, какие сообщения были прочитаны.

Однажды записанный в определённую партицию оффсет не изменяется и не перезаписывается. Т.е данные в Kafka являются неизменяемыми.

Потребители могут начать чтение с любого оффсета в партиции и продолжить от этой точки.

Оффсеты позволяют реализовывать различные гарантии доставки сообщений в Kafka.

Три основных типа гарантий доставки сообщений

at-most-once

at-most-once гарантирует, что сообщение будет доставлено один раз или вообще не будет доставлено. Это самый базовый подход и нацелен на минимизацию задержек, однако он также влечет за собой риск потери данных. В рамках этой семантики доставки Kafka не требует от продюсера ожидания подтверждения от брокера о получении сообщения. Это в какой то мере повышает производительность, но также плюсует вероятность потери данных при сбоях в сети или выходе из строя брокера.

Конфигурация продюсера в Kafka для достижения семантики at-most-once предусматривает установку параметра acks в значение 0. Т.е продюсер посылает сообщение и сразу переходит к следующему, не дожидаясь подтверждения от брокера. Можно сказать: «отправил и забыл».

Семантика at-most-once рекомендуется для систем, где потеря некоторых сообщений не критична.

at-least-once

А вот at-least-once уже гарантирует, что каждое сообщение будет доставлено по крайней мере один раз. Эта семантика подходит для приложений, где важно, чтобы ни одно сообщение не было потеряно.

Продюсеры Kafka отправляют сообщения, ожидая подтверждения от брокеров о том, что сообщение было сохранено. Если подтверждение не получено, продюсер попытается отправить сообщение снова. Это повторное отправление может привести к дублированию сообщений, если предыдущая отправка на самом деле была успешной, но подтверждение было потеряно из-за сетевой задержки или сбоев.

С версии Kafka 0.11, для уменьшения дублирования введена возможность конфигурации идемпотентности.

Потребители должны регулярно фиксировать оффсеты обработанных сообщений. Если потребитель не успеет зафиксировать оффсет перед сбоем, после перезапуска он начнет обработку с последнего зафиксированного оффсета, что может привести к повторной обработке некоторых сообщений.

Главная проблема семантики at-least-once — потенциальное дублирование сообщений при каком-либо сбое.

exactly-once

exactly-once гарантирует, что каждое сообщение будет обработано ровно один раз, что исключает возможность потери или дублирования данных. Основой для достижения такой семантики является транзакционная API, предоставляемая как продюсерам, так и потребителям.

Для точной обработки данных в случае сбоев или отката транзакций используют идемпотентные продюсеры. Они ююзают уникальные идентификаторы и номера последовательности для сообщений, чтобы предотвратить дублирование записей в журнале, даже если сообщение отправляется повторно из-за сбоев или ошибок сети. Это достигается за счет установки параметра enable.idempotence в true.

Транзакционная API позволяет продюсерам начинать, коммитить и откатывать транзакции, что дает некую атомарность записей через несколько партиций. При этом, важно управлять транзакциями таким образом, чтобы либо все сообщения в батче были видимы потребителям, либо ни одно. Для использования транзакционной API необходимо настроить продюсер с уникальным transactional.id..

На стороне потребителей можно использовать параметр isolation.level, который определяет, какие сообщения будут читаться: только те, что уже зафиксированы read_committed, или все подряд read_uncommitted.

Пример настройки продюсера:

bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
enable.idempotence=true
transactional.id=unique_transaction_id

Варианты управления оффсетами

Автоматическое

Автоматическое управление оффсетами позволяет потребителям данных автоматически подтверждать оффсеты сообщений через заданные интервалы времени.

Основные настройки для автоматического управления оффсетами включают enable.auto.commit и auto.commit.interval.ms. По умолчанию, enable.auto.commit установлен в true, что активирует автоматическое подтверждение оффсетов. Параметр auto.commit.interval.ms контролирует, как часто оффсеты будут автоматически подтверждаться, и по умолчанию составляет 5000 миллисекунд.

Пример конфигурации потребителя Kafka в Java:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("Received message: (key: " + record.key() + ", value: " + record.value() + ") at offset " + record.offset());
        }
    }
} finally {
    consumer.close();
}

Потребитель подписывается на топик и циклически опрашивает новые сообщения, автоматически подтверждая оффсеты каждые 5 секунд.

Хотя автоматическое управление оффсетами удобно, оно может привести к повторной обработке сообщений в случае сбоев. Если потребитель завершит работу или перезапустится между опросами, он может повторно обработать уже полученные сообщения, т.к последние оффсеты могли не быть подтверждены. Для фикса этого можно более часто подтверждать оффсеты или использовать ручное подтверждение.

Ручное управление

В Kafka есть два основных метода ручного подтверждения оффсетов: синхронное commitSync() и асинхронное commitAsync(). Синхронное подтверждение блокирует поток до тех пор, пока операция подтверждения не будет завершена успешно, что обеспечивает надежность, но может снизить производительность. Асинхронное подтверждение отправляет запрос на подтверждение и немедленно возвращает управление, что увеличивает пропускную способность, но требует доп. логики обработки ошибок, поскольку не предусматривает повторных попыток в случае сбоев.

Пример реализации асинхронного подтверждения оффсетов с обратным вызовом для обработки ошибок:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ExampleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            TopicPartition partition = new TopicPartition("your-topic", 0);
            consumer.assign(Collections.singletonList(partition));
            consumer.seek(partition, 0); // начать с определенного оффсета

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed record with key %s and value %s at offset %d%n", record.key(), record.value(), record.offset());
                    consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)), (offsets, exception) -> {
                        if (exception != null) {
                            System.err.println("Commit failed for offsets " + offsets);
                        }
                    });
                }
            }
        }
    }
}

Код настраивает потребителя Kafka для чтения с определенного оффсета и асинхронного подтверждения каждого обработанного сообщения. Обратный вызов проверяет успешность каждой операции подтверждения.


Приходите на открытые уроки в мае, которые проведут преподаватели OTUS в рамках курса "Apache Kafka":

Комментарии (0)