Цикл опроса является важным компонентом API Kafka consumer. Он отвечает за получение сообщений от брокеров Kafka и их обработку, вызывая обработчик сообщений, определенный пользователем. В этой статье расскажем, какие параметры нужно настроить, чтобы достичь большей производительности.
Что из себя представляет цикл опроса
Цикл опроса выполняет следующие основные шаги:
Клиент-потребитель отправляет запрос на получение пакета сообщений брокеру Kafka, запрашивая набор сообщений из назначенных разделов.
Брокер отвечает пакетом сообщений или сообщением об истечении тайм-аута, если нет доступных сообщений для потребления.
Клиент-потребитель обрабатывает сообщения, вызывая обработчик сообщений, определенный пользователем, который может быть как простой функцией, так и более сложным потоком обработки.
Клиент-потребитель фиксирует смещение последнего обработанного сообщения. Этот шаг важен, чтобы гарантировать, что клиент-потребитель не будет читать снова те же самые сообщения в случае сбоя или перезапуска.
Клиент-потребитель возвращается к шагу 1 для получения следующего пакета сообщений.
Важно отметить, что цикл опроса является блокирующей операцией. Это означает, что он будет блокировать выполнение до получения пакета сообщений от брокера или истечения времени ожидания. Поэтому цикл опроса должен выполняться в отдельном потоке или цикле событий, чтобы не блокировать основной поток приложения.
Для оптимизации производительности цикла опроса, вы можете настроить различные параметры конфигурации: размер пакета, размер получения и временной интервал опроса. Правильная настройка этих параметров позволяет балансировать между задержкой и пропускной способностью.
На какие параметры надо обратить внимание
Чтобы оптимизировать производительность цикла опроса в Kafka consumer:
Увеличьте размер пакета
Чтобы увеличить размер пакета в Kafka consumer, вы можете использовать параметр конфигурации max.poll.records
. Этот параметр определяет максимальное количество записей, возвращаемых в одном вызове метода poll()
. По умолчанию он установлен на 500 записей. Если увеличить это значение, то должно уменьшиться количество сетевых обращений.
Вот пример того, как установить большее значение для max.poll.records
:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records", "1000"); // Set batch size to 1000 records
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
Размер пакета определяет максимальное количество сообщений, которое потребитель будет извлекать в одном запросе опроса. Но помните, что увеличение размера пакета может сильно увеличить задержку потребителя.
ПРИМЕЧАНИЕ: Увеличение размера пакета также может увеличить использование памяти потребителем.
Настройка размера получения
Для настройки размера получения в Kafka consumer вы можете использовать параметр конфигурации fetch.max.bytes
. Этот параметр определяет максимальное количество байт, которое потребитель получит в одном запросе. По умолчанию он 50 МБ.
Вы можете выбрать компромиссное значение, которое позволит достичь баланса между сетевой задержкой и использованием памяти. Больший размер получения сократит количество запросов, необходимых для получения набора записей и уменьшит сетевые накладные расходы. Но большой размер получения также означает, что для буферизации записей будет требоваться больше памяти.
Вот пример того, как установить значение fetch.max.bytes
на более большое значение:
props.put("fetch.max.bytes", "10485760"); // Set fetch size to 10 MB
Размер получения определяет максимальное количество байт, которое потребитель получит в одном запросе опроса. Но слишком большой размер получения загрузит память потребителя.
Уменьшите время ожидания опроса
Время ожидания опроса определяет, как долго потребитель будет ждать сообщений, прежде чем вернуть пустой ответ. Если вы уменьшаете время ожидания опроса, потребитель становится более отзывчивым к новым сообщениям, что может снизить общую задержку работы потребителя. Однако, если сильно сократить время ожидания опроса это увеличит количество пустых ответов и снизит общую пропускную способность потребителя.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Set poll timeout to 100 milliseconds
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
Используйте асинхронную обработку
Если логика обработки сообщений нагружает центральный процессор или включает в себя операции ввода-вывода, попробуйте использовать асинхронные методы обработки: многопоточность или неблокирующий ввод-вывод. Асинхронная обработка сообщений позволяет сократить время блокировки цикла опроса. Это может улучшить отзывчивость и пропускную способность потребителя.
ExecutorService executorService = Executors.newFixedThreadPool(10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executorService.submit(() -> {
processRecord(record); // Process the record asynchronously
});
}
}
public void processRecord(ConsumerRecord<String, String> record) {
// Process the record
System.out.println("Received message: " + record.value());
}
Используйте параллельную обработку
Если логика обработки сообщений может быть распараллелена, попробуйте использовать несколько потоков или процессов. Параллельная обработка сообщений задействует несколько ядер процессора и увеличивает общую пропускную способность потребителя.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("test-topic"));
int numThreads = 10;
int batchSize = 100;
List<ConsumerRecord<String, String>> records = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> batchRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : batchRecords) {
records.add(record);
}
if (records.size() >= batchSize) {
List<List<ConsumerRecord<String, String>>> partitions = Lists.partition(records, batchSize / numThreads);
List<Callable<Void>> tasks = new ArrayList<>();
for (List<ConsumerRecord<String, String>> partition : partitions) {
tasks.add(() -> {
for (ConsumerRecord<String, String> record : partition) {
processRecord(record); // Process the record
}
return null;
});
}
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
try {
executorService.invokeAll(tasks);
} catch (InterruptedException e) {
// Handle the exception
} finally {
executorService.shutdown();
}
consumer.commitSync();
records.clear();
}
}
public void processRecord(ConsumerRecord<String, String> record) {
// Process the record
System.out.println("Received message: " + record.value());
}
Отслеживайте производительность потребителя
Важно регулярно отслеживать производительность потребителя, чтобы обнаруживать узкие места или проблемы, которые могут повлиять на производительность. Используйте JConsole или утилиты Kafka consumer lag monitoring, чтобы отслеживать производительность потребителя и выявлять возможные проблемы производительности.
Применяя эти советы, вы можете оптимизировать производительность цикла опроса в потребителе Kafka и достичь нужного уровня пропускной способности и задержки для вашего приложения.
Знание Apache Kafka в 2023 — мастхэв для инженеров инфраструктуры и важный плюс для программистов.
Если вы хотите глубже разобраться в Apache Kafka, приходите на курс «Apache Kafka для разработчиков». В нём углублённая теория и практика на Java или Golang с платформой Spring+Docker+Postgres. Вы узнаете типовые шаблоны проектирования, сделаете своё приложение надёжнее, получите опыт разработки нескольких приложений, использующих Kafka.