Apache Kafka — это решение для распределенной потоковой передачи сообщений. Проект, построенный на основе Scala и Java, направлен на создание единой платформы с высокой пропускной способностью и низкой задержкой для управления потоками данных в реальном времени. Основными возможностями платформы потоковой передачи данных является возможность публиковать потоки записей и подписываться на них, подобно очереди сообщений или системе бизнес‑обмена сообщениями. Также большое значение имеет возможность обрабатывать потоки записей по мере их поступления и хранить их отказоустойчивым и долговечным способом.
Основные сущности Kafka
Рассмотрим базовые компоненты Kafka. Здесь в качестве названий мы будем использовать англицизмы (продьюсер, консьюмер) вместо их переводов (производитель, потребитель), так как такая терминология используется в сообществе Kafka.
Данные в Kafka организованы в топики, которые представляют собой логические каналы, по которым продьюсеры отправляют данные и из которых консьюмеры считывают эти данные. Каждый топик разделен на партиции, которые в свою очередь являются основной единицей параллелизма в Kafka. Партиции позволяют Kafka масштабироваться по горизонтали, распределяя данные между несколькими брокерами.
Далее рассмотрим консьюмеров. По сути, консьюмеры — это клиентские приложения, которые подписываются на топики Kafka и обрабатывают данные. Они считывают записи из партиций и могут входить в группу консьюмеров, что обеспечивает балансировку нагрузки и отказоустойчивость. Каждый консьюмер в группе считывает данные из уникального набора партиций.
Продьюсеры — это клиентские приложения, которые публикуют данные в партициях Kafka. Они отправляют записи в соответствующие топики и партиции на основе используемой стратегии разделения, которая может быть основана на ключах или циклическом распределении.
Брокеры — это серверы, которые образуют кластер Kafka. Каждый брокер отвечает за получение, хранение и обслуживание данных. Они обрабатывают операции чтения и записи от продьюсеров и консьюмеров. Брокеры также управляют репликацией данных для обеспечения отказоустойчивости.
Для централизованного хранения информации о конфигурации, присвоения имен, обеспечения распределенной синхронизации и предоставления групповых услуг в Kafka используется служба ZooKeeper. Это отдельный компонент, взаимодействующий с кластером Kafka.
И, наконец, смещения (offsets) — это уникальные идентификаторы, присвоенные каждому сообщению в разделе. Пользователи будут использовать эти смещения для отслеживания прогресса в использовании сообщений из раздела.
В целом, с архитектурой мы разобрались, и теперь можно перейти к рассмотрению типовых проблем, возникающих в Kafka и способов их решения.
Проблемы продьюсера
Одной из наиболее распространенной проблемой при работе с Kafka являются ошибки, связанные с отправкой сообщений. Например, продьюсер не может доставлять сообщения в Kafka из‑за проблем с сетью, сбоев в работе брокера или других неисправностей.
Здесь лучшим решением является реализация правильную обработки ошибок в приложении продьюсера.
Ниже приводится фрагмент кода на Java, содержащий такой обработчик:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// process records
process_records(records);
}
} catch (KafkaException e) {
// handle Kafka exception
System.out.println("Error occurred while consuming messages from Kafka: " + e.getMessage());
} finally {
// close the consumer
consumer.close();
}
В случае неудачной попытки мы можем с помощью обработчика вернуться к выполнению попытки подключения через некоторое время.
Также можно настроить необходимые параметры конфигурации, такие как retry и retry.backoff.ms, в которых указывается количество повторных попыток и продолжительность перерыва между ними.
Для повторных попыток рассмотрите возможность использования метода экспоненциального замедления, при котором интервал между каждой последующей попыткой подключения экспоненциально увеличивается, в результате чего мы не «долбимся» к брокеру повторяющимися попытками, а разгружаем его, выполняя каждую последующую попытку с увеличивающимся интервалом.
Еще одной серьезной проблемой являются ошибки в данных, когда сообщения продьюсера неправильно записываются в Kafka, что приводит к потере данных.
Для решения этой проблемы нам необходимо корректно настроить значение acks для продьюсера. Этот параметр определяет количество подтверждений, которые продьюсер должен получить от сервера, прежде чем считать запрос выполненным. Это определяет время жизни отправляемых записей.
В случае, если мы укажем acks=0, продьюсер вообще не будет ждать подтверждения от сервера. Запись будет немедленно добавлена в буфер сокета и будет считаться отправленной. В этом случае невозможно гарантировать, что сервер получил запись, и настройка retry, о которой мы говорили чуть выше, не вступит в силу (поскольку клиент, как правило, не узнает о каких‑либо сбоях). При этом значение смещения, возвращаемое для каждой записи, всегда будет равно -1.
Если мы укажем acks=1, то это будет означать, что лидирующий брокер внесет запись в свой локальный журнал, но ответит, не дожидаясь полного подтверждения от всех консьюмеров. В этом случае, если у лидирующего брокера произойдет сбой сразу после подтверждения записи, но до того, как консьюмеры ее скопируют, запись будет потеряна.
И наконец acks=all означает, что брокер будет ждать подтверждения записи полным набором синхронизированных реплик. Это гарантирует, что запись не будет потеряна, пока хотя бы одна синхронизированная реплика остается активной. Это самая надежная из доступных гарантий.
Здесь возможна еще одна проблема, связанная с дублированием сообщений при повторных попытках. Для предотвращения подобных проблем включите следующий параметр на продьюсере:
enable.idempotence = true
В результате, как видно из приложенного рисунка, при потере подтверждения повторная запись не приведет к появлению дубликата.
Конфигурация и ее проблемы
Помимо проблем с данными, при работе продьюсера в Kafka можно также столкнуться с некорректными настройками конфигурации, приводящими к снижению производительности или сбоям.
При настройке параметров, естественно, необходимо уделить внимание тем рекомендациям, которые дает вендор. Исходя из ваших потребностей и задач, обязательно укажите подходящие значения для таких атрибутов, как bootstrap.servers, и уже знакомые нам acks, retry и так далее.
Атрибут bootstrap.servers определяет, к каким узлам брокерам им подключаться. В вашей сети могут использоваться десятки и даже сотни брокеров, и прописывать всех их в bootstrap.servers будет не очень хорошей идеей. Лучше указать два‑три брокера в качестве серверов начальной загрузки, с помощью которых уже будет осуществляться дальнейшее взаимодействие продьюсера с серверами Kafka.
На этом предлагаю закончить с проблемами продьюсера и перейти к проблемам консьюмера.
Проблемы консьюмеров
Собственно, консьюмеры в Kafka также могут столкнуться с рядом проблем. Одна из наиболее распространенных — это дисбаланс в группе консьюмеров. Внутри группы может возникнуть неравномерное распределение консьюмеров по партициям, что приводит к неравномерной обработке данных и возможным ограничениям производительности.
Для решения этой проблемы необходимо следить за распределением партиций для группы консьюмеров и учитывать, что размер должен соответствовать предполагаемому параллелизму. Когда клиент выходит из группы или входит в нее, Kafka выполняет автоматическую перебалансировку. Параметры конфигурации group.max.session.timeout.ms и group.min.session.timeout.ms используются для управления процессом перебалансировки.
Также консьюмеры могут столкнуться с проблемами управления смещениями, такими как слишком частое выполнение смещений, отсутствие выполнения смещений вообще или ошибочное выполнение смещений. Для контроля, фиксации смещений, используйте методы commitSync()
или commitAsync()
. Первый — это синхронное подтверждение, которое блокирует поток до тех пор, пока операция подтверждения не будет завершена успешно, что обеспечивает надежность, но может снизить производительность. Второй метод — асинхронное подтверждение, которое отправляет запрос на подтверждение и немедленно возвращает управление, что увеличивает пропускную способность, но требует дополнительной логики обработки ошибок, поскольку не предусматривает повторных попыток в случае сбоев.
И наконец, изменение баланса групп консьюмеров может на непродолжительное время нарушить обработку данных, что приведет к задержкам или паузам. Для решения необходимо понять, как происходит изменение баланса и как это влияет на ваше пользовательское приложение. Чтобы отрегулировать изменение баланса, выберите подходящие тайм‑ауты пользовательского сеанса в параметре session.timeout.ms (максимальное время, в течение которого консюмер может не подавать признаки жизни) и интервалы между нажатиями heartbeat.interval.ms (интервал подтверждения жизнеспособности потребителя). Чтобы справиться с проблемами, связанными с перебалансировкой, и гарантировать бесперебойное восстановление, включите в пользовательское приложение функции обработки ошибок и повторных попыток.
Заключение
В этой статье мы поговорили о том, какие основные проблемы могут возникнуть в работе базовых компонентов Apache Kafka и что можно сделать для предотвращения этих проблем.
Рекомендую обратить внимание на открытые уроки по Apache Kafka, которые скоро пройдут в Otus:
4 декабря: Kafka Connect: Легкая интеграция с внешними системами. Узнать подробнее
19 декабря: Schema Registry в Apache Kafka. Узнать подробнее
Комментарии (4)
vvberunenko
04.12.2024 15:55Ошибка в примере кода: вроде говорим за ошибку продьюсера, а код приведен для консьюмера, да и тот кривоват.
P40b0s
04.12.2024 15:55Да и про zookeeper следовало бы упомянуть...
Kafka 4.0 will be fully saying goodbye to ZooKeeper. There will be no support for running in ZK mode, or migrating from ZK mode.
TldrWiki
Прежде чем писать статью, постарайтесь разобраться в теме. При acks=1 подтверждение ожидается не от консьюмера, а от лидера. При acks=all от всех in-sync реплик, а не как написано синхронизированная реплика остаётся активной, что это вообще значит. Остальные ошибки исправляйте сами.
gexeg
Не от всех, а от мин.ин.синк