Kafka — это масштабируемая, отказоустойчивая платформа для обмена сообщениями в реальном времени. Она позволяет обрабатывать миллионы сообщений в секунду. Однако некоторые ситуации приводят к потере событий. Например, Kafka требует хорошего стабильного сетевого соединения между клиентами и брокерами; если сеть нестабильна, это может легко привести к потере сообщений.
Команда разработчиков Trendyol Tech видоизменила архитектуру и решила эту проблему с помощью outbox-шаблона, но столкнулась с другой проблемой — дублированием событий. Приводим перевод статьи о том, как разработчики залезли под капот Kafka и нашли решение этих двух проблем.
Откуда возникает проблема потерянных событий
У нас используется специфическая логика. После обработки события, потребляемого из main topic, мы отправляем его в retry topic в kafka. С помощью проекта планировщика мы получаем события из retry и возвращаем их в main. После потребления и обработки события из main мы отправляем их в retry. Этот процесс продолжается до тех пор, пока не завершится связанный с событием процесс (рис. 1).
Шаблон Outbox для предотвращения потери событий
Из-за проблем с сетью у наших продюсеров Kafka при отправке событий в retry происходили исключения по таймауту и потери событий. Хотя дефолтные значения конфигурации Kafka как для продюсеров, так и для брокеров, достаточно консервативные. В обычных условиях мы не должны сталкиваться с таймаутами. Однако, несмотря на использование дефолтных конфигураций, мы все равно получали исключения по таймауту.
Default configs for producer
- acks = all
- enable.idempotence = true
- retries = 2147483647
- max.in.flight.per.connection ≤ 5
Для решения проблемы потери событий мы решили реализовать Outbox-шаблон (рис. 2).
При использовании Outbox-шаблона мы добавляем в couchbase Kafka Error Handler API
документ. Он содержит детали события: ключ, значение, заголовки, тема. Мы отправляли события в указанную в документе тему с помощью couchbase kafka source connector.
Эта структура помогла нам решить проблему потери событий при возникновении любой ошибки. Однако проблема с сетью, которая заставила нас реализовать Outbox-шаблон, привела к другой проблеме — дублированию событий.
Дублирование событий, которое создает шаблон Outbox
Пока продолжались проблемы с сетью в Kafka, мы поняли, что получаем исключения по таймауту, хотя события успешно отправляются в kafka. Из-за исключения мы также отправляли события в kafka с шаблоном outbox, что приводило к дублированию событий.
В producer implementation при отправке события в kafka мы используем предоставленную функцию Callback, которая выдает результат выполнения, как только она завершится. Если событие не было доставлено по назначению, Kafka автоматически повторяет отправку события, которое не было полностью подтверждено брокерами.
private void sendToKafka(ProducerRecord<Object, Object> producerRecord, Object body) {
try {
ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(producerRecord);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<Object, Object> result) {
return;
}
@Override
public void onFailure(Throwable ex) {
sendEventToErrorHandler(producerRecord, body);
}
});
} catch (Exception e) {
sendEventToErrorHandler(producerRecord, body);
throw new Exception(e.getMessage());
}
}
В случае ошибки kafka пытается отправить событие по умолчанию повторно. Во время этих повторных попыток продюсер может успешно отправить событие в kafka. Не получив ответа, что события успешно записаны во все синхронизированные реплики, мы получаем исключение по таймауту. Поскольку мы не уверены, отправлено ли событие в тему или нет, мы отправляем событие в kafka через couchbase с помощью шаблона outbox. (Рисунок 3)
Для одного события возникает 1 дубликат, поэтому в сумме мы имеем 2 идентичных события. В случае следующей повторной отправки, будет уже 4, и так далее по экспоненте.
Проблемы, возникающие при дублировании событий
При потреблении событий в Kafka мы используем пакетные консьюмеры. По умолчанию размер пакета у них составляет 500 — таким мы его оставим. События, принадлежащие одному ключу, объединяются в списки для создания и запуска потока для каждого уникального ключа (логика, специфичная для домена, рис. 4). В силу специфики доменной логики мы не можем напрямую получить и обработать первое событие с тем же ключом. Нам необходимо обработать события, удовлетворяющие определенному условию, из событий, принадлежащих одному ключу.
private <T> void processBatchEvent(List<ConsumerRecord<String, String>> consumerRecords, Consumer<T> consumerService, Class<T> eventClass) {
Map<String, List<ConsumerRecord<String, String>>> eventMap = new HashMap<>();
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
List<ConsumerRecord<String, String>> records = eventMap.computeIfAbsent(consumerRecord.key(), val -> new ArrayList<>());
records.add(consumerRecord);
}
try {
List<CompletableFuture<Boolean>> completableFutures = eventMap.values().stream()
.map(eventWrappers -> {
return kafkaBatchConsumerService.processEvent(eventWrappers, consumerService, eventClass);
}).collect(Collectors.toList());
completableFutures.forEach(CompletableFuture::join);
} catch (Exception e) {
throw e;
}
}
@Async("asyncExecutorForKafkaBatchConsumer")
public <T> CompletableFuture<Boolean> processEvent(List<ConsumerRecord<String, String>> records, Consumer<T> consumerService, Class<T> eventClass) {
for (ConsumerRecord<String, String> consumerRecord : records) {
try {
T event = getEvent(consumerRecord, eventClass);
if(domainSpecificFilter(event)) {
consumerService.accept(event);
}
} catch (Exception e) {
produceErrorEvent(consumerRecord); // outbox pattern error handler
}
}
return CompletableFuture.completedFuture(true);
}
Иногда в потоке должны обрабатываться 500 событий из-за дублирования событий и логики обработки событий с одинаковым ключом в одном потоке. Если считать, что время обработки события составляет 250 мс, то на обработку 500 событий в одном потоке уйдет не менее 125 секунд (около 2 минут).
Что происходит при этом:
Высокая нагрузка на процессор, память и сети в поде (рис. 5).
Это приводит к тому, что консьюмер клиент падает и не может отправлять данные о себе в течение времени
session.timeout.ms
. После чего консьюмер признается мёртвым, его разделы переназначаются и происходит ребалансировка.После ребалансировки консьюмер не может ничего коммитить. Возникает исключение CommitFailedException, и события, которые действительно были обработаны, обрабатываются заново.
Таймауты в кластере kubernetes (рис. 6 и 7)
Возникновение задержек в работе консьюмеров (рис. 8 и 9)
Перезапуск контейнера/подсистемы
В период с 04/28 по 05/08 (рис. 5) в Kafka начались проблемы с сетью, резко возросли значения CPU, Container CPU CFS Throttled и Lag Size для приложения, работающего на 32 разделах, 32 подсистемах, которое должно было обрабатывать всего 500 000 событий в час.
Также увеличились значения задержки чтения/записи и сетевого трафика (рис. 5 и 6). И это приводит к большому количеству таймаутов между вызовами API.
Решаем проблему
Мы начали думать о решении проблемы дублирования событий, вызванных Outbox-шаблоном, не дожидаясь улучшений со стороны сети, поскольку увеличились таймауты, потребление ресурсов и задержки.
Мы не хотели отказываться от Outbox-шаблона, поскольку в случае отказа мы не сможем справиться с потерями событий. Поэтому мы решили избавиться от дублирования событий. Во всех проектах общие методы processBatchEvent
и processEventmethods
работают как пакетный косьюмер и обрабатывают события. В методе processEvent
мы поняли, что может быть достаточно обрабатывать только первое событие, которое прошло элемент управления domainSpecificFilter()
, а остальные события обрабатывать не нужно. Поэтому мы рефакторизовали метод.
@Async("asyncExecutorForKafkaBatchConsumer")
public <T> CompletableFuture<Boolean> processEvent(List<ConsumerRecord<String, String>> records, Consumer<T> consumerService, Class<T> eventClass) {
for (ConsumerRecord<String, String> consumerRecord : records) {
try {
T event = getEvent(consumerRecord, eventClass);
if(domainSpecificFilter(event)) {
consumerService.accept(event);
break; // the only change
}
} catch (Exception e) {
produceErrorEvent(consumerRecord); // outbox pattern error handler
}
}
return CompletableFuture.completedFuture(true);
}
После обработки первого события, прошедшего контроль, командой break
мы запрещаем обработку остальных событий. Таким образом, можно решить проблему дублирующих событий, обработав только одно событие, прошедшее контроль, и запретив обработку остальных событий.
Результаты
Мы решили все проблемы, возникающие при дублировании событий, небольшим изменением команды
break
в используемом нами методе.
На рисунке 10 в красной рамке 05.09 видно, что мы развернули наш новый метод и обновили критические API. Как видно из приведенных ниже метрик, ситуация улучшилась.
Примечание: В период с 05/11-05/14 мы были вынуждены свернуть разработку из-за ошибки, связанной с нашим доменом. После исправления ошибки 16.05. мы снова развернули функцию. (рис. 10, зеленая рамка)
На рис.10 видно, как резко снизилось использование ресурсов.
В кластере уменьшились: сетевой трафик и задержки чтения/записи, а вместе с ними и случаи таймаута. (рис. 11 и 12)
Мы больше не получили ни одного исключения
CommitFailedException
.Мы можем успешно отправлять heartbeats, поэтому ребалансировки, а вместе с ними и перезапуски, закончились.
Количество дублирующихся событий в Kafka росло в геометрической прогрессии. После найденного решения рост числа событий в теме уменьшился, и проблема отставания была решена. (рис. 13 и 14)
Заключение
Несмотря на то что Kafka является мощной системой обмена сообщениями, мы можем столкнуться с нежелательными ситуациями из-за банальных проблем в сети. Но их можно решить, если залезть под капот Kafka и кастомизировать инструмент. Благодаря кастомной архитектуре мы можем спокойно обрабатывать события без потерь, больших затрат ресурсов и проблем с таймаутом.
Умение разбираться в архитектуре Apache Kafka может быть полезным навыком для инженеров инфраструктуры и для программистов.
Если вы хотите глубже изучить Apache Kafka, приходите на интенсив Apache Kafka для разработчиков. В нём углублённая теория и практика на Java или Golang с платформой Spring+Docker+Postgres. Вы узнаете типовые шаблоны проектирования, сделаете своё приложение надёжнее, получите опыт разработки нескольких приложений, использующих Kafka.
Какие проблемы мы учимся решать на интенсиве?
Периодически отваливается логистический шлюз, и пользователь получает ошибку при создании заказа в онлайн-магазине одежды. Расскажем, как отвязать логистический шлюз от создания заказа, чтобы они не были связаны напрямую.
Сервисы обмениваются сообщениями о событиях напрямую, и иногда по вине сети они не доставляются и пропадают. Объясним, что сделать, чтобы ничего не терять.
Пользователь создал заказ, а магазин отправил его в доставку, упаковал и отгрузил водителю. И только после этого онлайн-магазин узнал, что оплата не прошла. Научим, как совершать все действия в правильном порядке, чтобы заказ не высылался до подтверждения оплаты.
???? Программа курса на нашем сайте
Курс поможет тебе уменьшить время на рабочие задачи с Кафкой, добавить красивую строчку в резюме и взобраться на следующую ступень карьерной лестницы.
До встречи на курсе!
Комментарии (2)
kohus
25.08.2023 07:23Вообще-то, delivery.timeout.ms по умолчанию 2 минуты. Ничего себе, проблемы с сетью. Какой-то странный outbox, тогда уже проще писать сразу записи в базу и тащить их оттуда кафка-коннектом.
vooft
Ну, т.е. вместо решения проблемы с таймаутом, они просто начали вручную выкидывать дубликаты.
Или можно было попробовать идемпотентные продюсеры https://www.linkedin.com/pulse/kafka-idempotent-producer-rob-golder/