Kafka — это масштабируемая, отказоустойчивая платформа для обмена сообщениями в реальном времени. Она позволяет обрабатывать миллионы сообщений в секунду. Однако некоторые ситуации приводят к потере событий. Например, Kafka требует хорошего стабильного сетевого соединения между клиентами и брокерами; если сеть нестабильна, это может легко привести к потере сообщений. 

Команда разработчиков Trendyol Tech видоизменила архитектуру и решила эту проблему с помощью outbox-шаблона, но столкнулась с другой проблемой — дублированием событий. Приводим перевод статьи о том, как разработчики залезли под капот Kafka и нашли решение этих двух проблем.

Откуда возникает проблема потерянных событий

У нас используется специфическая логика. После обработки события, потребляемого из main topic, мы отправляем его в retry topic в kafka. С помощью проекта планировщика мы получаем события из retry и возвращаем их в main. После потребления и обработки события из main мы отправляем их в retry. Этот процесс продолжается до тех пор, пока не завершится связанный с событием процесс (рис. 1).

Рис. 1
Рис. 1

Шаблон Outbox для предотвращения потери событий

Из-за проблем с сетью у наших продюсеров Kafka при отправке событий в retry происходили исключения по таймауту и потери событий. Хотя дефолтные значения конфигурации Kafka как для продюсеров, так и для брокеров, достаточно консервативные. В обычных условиях мы не должны сталкиваться с таймаутами. Однако, несмотря на использование дефолтных конфигураций, мы все равно получали исключения по таймауту.

Default configs for producer
 - acks = all
 - enable.idempotence = true
 - retries = 2147483647
 - max.in.flight.per.connection ≤ 5

Для решения проблемы потери событий мы решили реализовать Outbox-шаблон (рис. 2).

Рис. 2 Outbox-шаблон помогает решить проблему потери событий
Рис. 2 Outbox-шаблон помогает решить проблему потери событий

При использовании Outbox-шаблона мы добавляем в couchbase Kafka Error Handler API документ. Он содержит детали события: ключ, значение, заголовки, тема. Мы отправляли события в указанную в документе тему с помощью couchbase kafka source connector.

Эта структура помогла нам решить проблему потери событий при возникновении любой ошибки. Однако проблема с сетью, которая заставила нас реализовать Outbox-шаблон, привела к другой проблеме — дублированию событий.

Дублирование событий, которое создает шаблон Outbox

Пока продолжались проблемы с сетью в Kafka, мы поняли, что получаем исключения по таймауту, хотя события успешно отправляются в kafka. Из-за исключения мы также отправляли события в kafka с шаблоном outbox, что приводило к дублированию событий.

В producer implementation при отправке события в kafka мы используем предоставленную функцию Callback, которая выдает результат выполнения, как только она завершится. Если событие не было доставлено по назначению, Kafka автоматически повторяет отправку события, которое не было полностью подтверждено брокерами.

private void sendToKafka(ProducerRecord<Object, Object> producerRecord, Object body) {
    try {
        ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(producerRecord);
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(SendResult<Object, Object> result) {
                return;
            }

            @Override
            public void onFailure(Throwable ex) {
                sendEventToErrorHandler(producerRecord, body);
            }
        });
    } catch (Exception e) {
        sendEventToErrorHandler(producerRecord, body);
        throw new Exception(e.getMessage());
    }
}

В случае ошибки kafka пытается отправить событие по умолчанию повторно. Во время этих повторных попыток продюсер может успешно отправить событие в kafka. Не получив ответа, что события успешно записаны во все синхронизированные реплики, мы получаем исключение по таймауту. Поскольку мы не уверены, отправлено ли событие в тему или нет, мы отправляем событие в kafka через couchbase с помощью шаблона outbox. (Рисунок 3)

Рис. 3. Дубликаты, которые создает Outbox-шаблон
Рис. 3. Дубликаты, которые создает Outbox-шаблон

Для одного события возникает 1 дубликат, поэтому в сумме мы имеем 2 идентичных события. В случае следующей повторной отправки, будет уже 4, и так далее по экспоненте.

Проблемы, возникающие при дублировании событий

При потреблении событий в Kafka мы используем пакетные консьюмеры. По умолчанию размер пакета у них составляет 500 — таким мы его оставим. События, принадлежащие одному ключу, объединяются в списки для создания и запуска потока для каждого уникального ключа (логика, специфичная для домена, рис. 4). В силу специфики доменной логики мы не можем напрямую получить и обработать первое событие с тем же ключом. Нам необходимо обработать события, удовлетворяющие определенному условию, из событий, принадлежащих одному ключу.

private <T> void processBatchEvent(List<ConsumerRecord<String, String>> consumerRecords, Consumer<T> consumerService, Class<T> eventClass) {
    Map<String, List<ConsumerRecord<String, String>>> eventMap = new HashMap<>();
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
        List<ConsumerRecord<String, String>> records = eventMap.computeIfAbsent(consumerRecord.key(), val -> new ArrayList<>());
        records.add(consumerRecord);
    }

    try {
        List<CompletableFuture<Boolean>> completableFutures = eventMap.values().stream()
                .map(eventWrappers -> {
                       return kafkaBatchConsumerService.processEvent(eventWrappers, consumerService, eventClass);
                }).collect(Collectors.toList());
        completableFutures.forEach(CompletableFuture::join);
    } catch (Exception e) {
        throw e;
    }
}
@Async("asyncExecutorForKafkaBatchConsumer")
public <T> CompletableFuture<Boolean> processEvent(List<ConsumerRecord<String, String>> records, Consumer<T> consumerService, Class<T> eventClass) {

    for (ConsumerRecord<String, String> consumerRecord : records) {
        try {
            T event = getEvent(consumerRecord, eventClass);
            
            if(domainSpecificFilter(event)) {
               consumerService.accept(event);
            } 

        } catch (Exception e) {
            produceErrorEvent(consumerRecord); // outbox pattern error handler
        }
    }
    return CompletableFuture.completedFuture(true);
}
Рис. 4
Рис. 4

Иногда в потоке должны обрабатываться 500 событий из-за дублирования событий и логики обработки событий с одинаковым ключом в одном потоке. Если считать, что время обработки события составляет 250 мс, то на обработку 500 событий в одном потоке уйдет не менее 125 секунд (около 2 минут).

Что происходит при этом:

  • Высокая нагрузка на процессор, память и сети в поде (рис. 5).

  • Это приводит к тому, что консьюмер клиент падает и не может отправлять данные о себе в течение времени session.timeout.ms. После чего консьюмер признается мёртвым, его разделы переназначаются и происходит ребалансировка.

  • После ребалансировки консьюмер не может ничего коммитить. Возникает исключение CommitFailedException, и события, которые действительно были обработаны, обрабатываются заново.

  • Таймауты в кластере kubernetes (рис. 6 и 7)

  • Возникновение задержек в работе консьюмеров (рис. 8 и 9)

  • Перезапуск контейнера/подсистемы

Рис. 5: CPU, Memory, Container CPU Cfs Throttled и Network Usage
Рис. 5: CPU, Memory, Container CPU Cfs Throttled и Network Usage

В период с 04/28 по 05/08 (рис. 5) в Kafka начались проблемы с сетью, резко возросли значения CPU, Container CPU CFS Throttled и Lag Size для приложения, работающего на 32 разделах, 32 подсистемах, которое должно было обрабатывать всего 500 000 событий в час.

Также увеличились значения задержки чтения/записи и сетевого трафика (рис. 5 и 6). И это приводит к большому количеству таймаутов между вызовами API.

Рис. 6 Kubernetes Cluster Node Read/Write Latency
Рис. 6 Kubernetes Cluster Node Read/Write Latency
Рис. 7 Kubernetes Cluster Node Network Traffic
Рис. 7 Kubernetes Cluster Node Network Traffic
Рис. 8 Kafka Topic Write Rate
Рис. 8 Kafka Topic Write Rate
Рис. 9 Kafka Topic Lag Size
Рис. 9 Kafka Topic Lag Size

Решаем проблему

Мы начали думать о решении проблемы дублирования событий, вызванных Outbox-шаблоном, не дожидаясь улучшений со стороны сети, поскольку увеличились таймауты, потребление ресурсов и задержки.

Мы не хотели отказываться от Outbox-шаблона, поскольку в случае отказа мы не сможем справиться с потерями событий. Поэтому мы решили избавиться от дублирования событий. Во всех проектах общие методы processBatchEvent и processEventmethods работают как пакетный косьюмер и обрабатывают события. В методе processEvent мы поняли, что может быть достаточно обрабатывать только первое событие, которое прошло элемент управления domainSpecificFilter(), а остальные события обрабатывать не нужно. Поэтому мы рефакторизовали метод.

@Async("asyncExecutorForKafkaBatchConsumer")
public <T> CompletableFuture<Boolean> processEvent(List<ConsumerRecord<String, String>> records, Consumer<T> consumerService, Class<T> eventClass) {

    for (ConsumerRecord<String, String> consumerRecord : records) {
        try {
            T event = getEvent(consumerRecord, eventClass);
            
            if(domainSpecificFilter(event)) {
               consumerService.accept(event);
               break; // the only change 
            } 

        } catch (Exception e) {
            produceErrorEvent(consumerRecord); // outbox pattern error handler
        }
    }
    return CompletableFuture.completedFuture(true);
}

После обработки первого события, прошедшего контроль, командой break мы запрещаем обработку остальных событий. Таким образом, можно решить проблему дублирующих событий, обработав только одно событие, прошедшее контроль, и запретив обработку остальных событий.

Результаты

  • Мы решили все проблемы, возникающие при дублировании событий, небольшим изменением команды break в используемом нами методе.

  • На рисунке 10 в красной рамке 05.09 видно, что мы развернули наш новый метод и обновили критические API. Как видно из приведенных ниже метрик, ситуация улучшилась.

Примечание: В период с 05/11-05/14 мы были вынуждены свернуть разработку из-за ошибки, связанной с нашим доменом. После исправления ошибки 16.05. мы снова развернули функцию. (рис. 10, зеленая рамка)

  • На рис.10 видно, как резко снизилось использование ресурсов.

Рис. 10 CPU, Memory, Container CPU Cfs Throttled и Network Usage
Рис. 10 CPU, Memory, Container CPU Cfs Throttled и Network Usage
  • В кластере уменьшились: сетевой трафик и задержки чтения/записи, а вместе с ними и случаи таймаута. (рис. 11 и 12)

Рис. 11 Kubernetes Cluster Node Network Traffic
Рис. 11 Kubernetes Cluster Node Network Traffic
Рис. 12 Kubernetes Cluster Node Read/Write Latency
Рис. 12 Kubernetes Cluster Node Read/Write Latency
  • Мы больше не получили ни одного исключения CommitFailedException

  • Мы можем успешно отправлять heartbeats, поэтому ребалансировки, а вместе с ними и перезапуски, закончились.

  • Количество дублирующихся событий в Kafka росло в геометрической прогрессии. После найденного решения рост числа событий в теме уменьшился, и проблема отставания была решена. (рис. 13 и 14)

Рис. 13 Kafka Topic Write Rate
Рис. 13 Kafka Topic Write Rate
Рис. 14 Kafka Topic Lag Size
Рис. 14 Kafka Topic Lag Size

Заключение

Несмотря на то что Kafka является мощной системой обмена сообщениями, мы можем столкнуться с нежелательными ситуациями из-за банальных проблем в сети. Но их можно решить, если залезть под капот Kafka и кастомизировать инструмент. Благодаря кастомной архитектуре мы можем спокойно обрабатывать события без потерь, больших затрат ресурсов и проблем с таймаутом.


Умение разбираться в архитектуре Apache Kafka может быть полезным навыком для инженеров инфраструктуры и для программистов.

Если вы хотите глубже изучить Apache Kafka, приходите на интенсив Apache Kafka для разработчиков. В нём углублённая теория и практика на Java или Golang с платформой Spring+Docker+Postgres. Вы узнаете типовые шаблоны проектирования, сделаете своё приложение надёжнее, получите опыт разработки нескольких приложений, использующих Kafka.

Какие проблемы мы учимся решать на интенсиве?

  1. Периодически отваливается логистический шлюз, и пользователь получает ошибку при создании заказа в онлайн-магазине одежды. Расскажем, как отвязать логистический шлюз от создания заказа, чтобы они не были связаны напрямую.

  2. Сервисы обмениваются сообщениями о событиях напрямую, и иногда по вине сети они не доставляются и пропадают. Объясним, что сделать, чтобы ничего не терять.

  3. Пользователь создал заказ, а магазин отправил его в доставку, упаковал и отгрузил водителю. И только после этого онлайн-магазин узнал, что оплата не прошла. Научим, как совершать все действия в правильном порядке, чтобы заказ не высылался до подтверждения оплаты.

???? Программа курса на нашем сайте

Курс поможет тебе уменьшить время на рабочие задачи с Кафкой, добавить красивую строчку в резюме и взобраться на следующую ступень карьерной лестницы.

До встречи на курсе!

Комментарии (2)


  1. vooft
    25.08.2023 07:23
    +2

    Ну, т.е. вместо решения проблемы с таймаутом, они просто начали вручную выкидывать дубликаты.

    Или можно было попробовать идемпотентные продюсеры https://www.linkedin.com/pulse/kafka-idempotent-producer-rob-golder/


  1. kohus
    25.08.2023 07:23

    Вообще-то, delivery.timeout.ms по умолчанию 2 минуты. Ничего себе, проблемы с сетью. Какой-то странный outbox, тогда уже проще писать сразу записи в базу и тащить их оттуда кафка-коннектом.