В предыдущей статье мы рассмотрели как работает KafkaConsumer и как реализован механизм auto-commit. 

В этой статье я хочу остановиться на том как получает и обрабатываются сообщения spring-kafka. 

Стоит оговориться, что сейчас мы рассматриваем ситуацию с enable.auto.commit = true. Согласно документации начиная с версии 2.3 настройка auto.commit по-умолчанию выставлена в false, хотя раньше это значение было аналогично значению по-умолчанию в kafka-clients, т.е. true. Это связанно с тем что контейнер KafkaMessageListenerContainer имеет собственные механизмы управления коммитом. Насколько это удобнее и какие тут есть плюсы и минусы - пожалуй тема отдельной статьи. 

Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false. Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.

 Я постараюсь ответить на следующие вопросы:

  1. Как в spring-kafka построена работа с KafkaConsumer?

  2. Какие есть возможности для параллельной обработки сообщений?

  3. Что происходит при возникновении ошибок при обработки сообщений?

Рассмотрим типичный минимальный пример для получения сообщений из топика.  Прежде всего нам понадобится конфигурация для подключения к топику:

@Configuration
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        myListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
	  	factory.setConcurrency(3);
	  	...
      return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
       Map<String, Object> props = new HashMap<>();
	  	props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “true”);
        ...
        return props;
    }
}

И код который будет совершать какую-то работу с полученными сообщениями:

@KafkaListener(topics = "myTopic", containerFactory=”myListenerContainerFactory”)
public void listen(String data) {
    ...
}

Сперва остановимся подробнее на конфигурации. Тут в игру вступают две сущности:

  1. DefaultKafkaConsumerFactory

  2. ConcurrentKafkaListenerContainerFactory

Первая фабрика по сути своей принимает от нас все необходимые свойства конфигурации и создает уже знакомый нам KafkaConsumer из библиотеки kafka-clients в методе createRowConsumer(...):

protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) {
return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(),
	this.valueDeserializerSupplier.get());
}

ConcurrentKafkaListenerContainerFactory также прост по своей сути.  Он создаёт объект ConcurrentKafkaListenerContainer, который в свою очередь создает KafkaMessageListenerContainer в количестве указанном в поле concurrent.  Если наш топик имеет партиций меньше чем указанно в поле concurrent, то значение поля изменяется на количество партиций. Это сделано ввиду бесполезности создания большего числа слушателей, чем у топика есть партиций, т.к. kafka на своей стороне позволяет подключиться к одной партиции только одному слушателю в пределах одной группы слушателей, все остальные слушатели из этой группы будут распределены по другим партициям либо останутся незадействоваными. Все это можно наглядно увидеть в методе doStart()

Итак, мы добрались наконец добрались до  KafkaMessageListenerContainer

В методе doStart() этого класса мы получаем ссылку на метод в классе на который мы повесили аннотацию @KafkaListener и указали в параметре containerFactory название нашего ConcurrentKafkaListenerContainerFactory из конфигурации. 

 Object messageListener = containerProperties.getMessageListener();   

Далее в коде мы видим получение объекта AsyncListenableTaskExecutor и если этого объекта не существует будет создан объект с типом SimpleAsyncTaskExecutor, который создаст отдельный поток для нашего слушателя.

AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
	consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");
  containerProperties.setConsumerTaskExecutor(consumerExecutor);
}

В конце концов мы создаем обертку над полученным слушателем из класса ListenerConsumer() объявленного тут же в KafkaMessageListenerContainer, а вот он уже в свою очередь создаст в конструкторе экземпляр KafkaConsumer с помощью фабрики DefaultKafkaConsumerFactory объявленной нами в конфигурации

this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
   this.consumerGroupId,
   this.containerProperties.getClientId(),
   KafkaMessageListenerContainer.this.clientIdSuffix,
   consumerProperties);

Мы установили связь между кодом, который мы пометили аннотацией @KafkaListener и непосредственно классом KafkaConsumer и разобрались для чего используются классы в конфигурации.

Теперь посмотрим как же происходит получение и обработка сообщения.

Итак у ListenerConsumer есть метод pollAndInvoke() в котором происходит вызов метода в котором в свою очередь непосредственно происходит вызов метода poll() у KafkaConsumer для получения новых сообщений (и коммита offset в случае enable.auto.commit = true)  Полученный сообщения передаются в метод invokeListener() для непосредственной обработки.

private void invokeListener(final ConsumerRecords<K, V> records) {
	if (this.isBatchListener) {
		invokeBatchListener(records);
	} else {
		invokeRecordListener(records);
	}
}

В методе помеченным @KafkaListener мы можем обрабатывать сообщения как по одному так и пачкой (все полученные из топика при вызове KafkaConsumer->poll()) Если мы остановились на первом варианте и обрабатываем сообщения в нашем @KafkaListener по одному, то ListenerConsumer будет просто с помощью итератора идти по всему полученному набору передавая сообщения на обработку (метод doInvokeWithRecords(...)), который в свою очередь через цепочку вызовов передает сообщение на обработку нашему методу помеченному аннотацией @KafkaListener.

А что же происходит если в нашем коде при обработке сообщения выбрасывается исключение? 

При получении и обработки сообщений ListenerConsumer отлавливает все возможные типы исключений в методе doRun() При обработке неспецифичных исключений используются два разных механизма в зависимости от версии spring-kafka. В версиях младше 2.5 мы можем наблюдать следующее поведение:  если мы самостоятельно не настроили обработчик ошибок, то будет создан LoggingErrorHandler, который просто напросто залогирует ошибку и обработка продолжится.

Начиная с версии 2.5 обработчиком по-умолчанию становится SeekToCurrentErrorHandler  в котором произойдет 10 попыток обработать сообщение без задержки и если все они закончатся неудачей ошибка также будет залогирована и мы перейдем к обработке следующего сообщения.

Это необходимо учитывать при оценки гарантий обработки сообщений, которые поддерживает наше приложение, т.к. со своей стороны kafka выполнила все обязательства по доставке сообщения.

У нас остался еще один момент, который стоит прояснить: как часто spring-kafka будет вызывать метод poll() у KafkaConsumer ? В нашей конфигурации при создании KafkaListenerContainerFactory мы можем указать следующий параметр:

factory.getContainerProperties().setPollTimeout(3000);  

что будет означать следующее: при вызове метода poll() у KafkaConsumer ему в качестве аргумента будет передаваться это значение (само значение задает время в миллисекундах, т.е. в нашем случае 3 секунды). Именно это происходит в методе doPoll()

 this.consumer.poll(this.pollTimeout);

KafkaConsumer же в свою очередь при вызове метода poll будет ждать переданное ему количество времени пока не наберется столько сообщений сколько мы указали в параметре max.poll.records (значение по-умолчанию 500 записей).  Если pollTimeout будет равен 0, то вызов метода poll будет происходить без задержек возвращая пустой результат.

Подведем итог.

  1. Под капотом spring-kafka использует все тот же KafkaConsumer из библиотеки kafka-clients и работа с ним осуществляется в отдельном потоке.

  2. Мы можем смело использовать механизм auto-commit, но этот параметр лучше всего явно прописывать в конфигурации.

  3. Следует внимательно отнестись к обработчикам ошибок по-умолчанию и учитывать их поведение при расчетах надежности нашей системы.

Комментарии (23)


  1. BugM
    01.08.2021 00:13

    Мы можем смело использовать механизм auto-commit, но этот параметр лучше всего явно прописывать в конфигурации.

    Может все таки не надо?

    at least once на уровне вашего приложения с ним не гарантируется. Прочитали - закомитили - упали. Данные потеряны. И никто не виноват.


    1. dm_aq Автор
      01.08.2021 07:33

      Почему не гарантируется? Все зависит от того как мы работаем с исключительными ситуациями , если используем обработчики ошибок по-умолчанию - то да, вы правы.

      Тут важно отметить один момент про который я писал в первой статье: при включённой настройке enable.auto.commit=true коммит не происходит сам по себе, а жестко связан с получением сообщения.

      Т.е. последовательность в цикле обработки сообщений у нас получается такая:

      1. Коммитим предыдущее смещение

      2. Получаем новые сообщения

      3. Обрабатываем


      1. BugM
        01.08.2021 13:29

        Обработка данных бывает мультипоточной. В своем пуле потоков. Когда поток данных достаточно большой это типовая схема.

        Коммит произойдет независимо от того закончилась обработка или нет. Это взрывоопасная схема by design.


        1. dm_aq Автор
          01.08.2021 15:49

          Тут опять же все зависит от того как вы спроектируете обработку. В данном случае consumer и listener выполняются в одном потоке. Если вы отделяете обработку в отдельный поток, то и менеджмент коммитов остаётся на вашей совести. Собственно так и поступает spring-kafka при enable.auto.commit=false.


          1. BugM
            01.08.2021 15:58

            У вас проблема именно в архитектуре. События "данные обработаны" и "commit" происходят независимо друг от друга.

            Есть вариант когда они будут связаны и зависимы, но это однозначно из кода не видно. И любой рефакторинг может эту связь поломать. Это даже тестами не поймать. Вероятность просыпания событий небольшая.

            Практика говорит что чем меньше таких косяков в архитектуре приложения, тем оно лучше и дольше живет в дикой среде.


            1. dm_aq Автор
              01.08.2021 16:19

              События "данные обработаны" и "commit" происходят независимо друг от друга.

              Почему вы так решили? В случае если мы не берём управление коммитом на себя и оставляем enable.auto.commit=true делегируя тем самым коммит KafkaConsumer последовательность вполне детерминирована :

              1. Закоммитили предыдущий оффсет

              2. Получили новые сообщения

              3. Обработали

              Если мы сознательно не выносим последний шаг в отдельный поток


              1. BugM
                01.08.2021 16:34

                Представьте себя на месте разработчика которому прилетел тикет на ускорение обработчика. Один поток это откровенно медленно. Да и он может висеть на io, и недоиспользовать даже это одно ядро. Вероятность такого тикета далеко ненулевая.

                Вы спокойно выносите обработку в свой пул, обвешиваете футурами или как у вас принято. И закрываете тикет. По коду не видно что надо сделать еще что-то. Тесты и тестирование проходят успешно.

                А через пару месяцев в проде все взрывается. От обычного рестарта вашего приложения. Данные потеряны навсегда. Что делать непонятно. Даже причину потери данных установить очень нетривиально.

                При наличии явно прописанной в коде связи событий "обработали -> закомитили" вероятность такой ошибки гораздо меньше. Тот же разработчик ее увидит и поправит соответвенно. Ошибки с большей вероятностью поймаются тестами.


                1. dm_aq Автор
                  01.08.2021 16:44

                  Да, тут соглашусь, такое может быть. Это пожалуй одна из причин для чего я начал этот цикл статей - немного развеять туман в связке spring + kafka (в том числе и для себя)


                1. DDtKey
                  02.08.2021 03:57

                  Но ведь единицей масштабирования в kafka принято считать партицию.

                  При необходимости масштабирования - всегда можно выделить больше партиций с соответствующим числом обработчиков (где свои смещения)

                  А расспараллеливать обработку из одной очереди/партиции выглядит некорректным (by design), т.к обеспечить атомарное выполнение операции проблематично.


                  1. BugM
                    02.08.2021 12:10

                    Когда повидаешь всякого начинаешь подстилать соломки везде. Скорость записи в одну партицию достаточно велика чтобы это было проблематично обработать одним потом в некоторых случаях.

                    Проблем в атомарности нет. Немного больше кода и аккуратная обработка коммитов. И все.


                    1. DDtKey
                      02.08.2021 13:59

                      Ну зависит все же от количества партиций, если запись слишком быстрая - их можно и разнести.

                      А как нет проблем с атомарностью? Если вы вынесете в асинхронную обработку, то у вас сообщения пришедшие позже могут быть обработаны раньше и успешно, а те что пришли раньше - упасть. Как контролируете коммиты в таких случаях?

                      Вот пришло 3 сообщения с следующими оффсетами:
                      35: успешно
                      36: ошибка
                      37: успешно (раньше 36)

                      Просто масштабировать "минимальную единицу масштабирования" - выглядит не очень корректно.

                      Мне просто действительно интересен ваш опыт :)

                      Корректный способ, разве что, отдельная очередь самой обработки (но синхронно обрабатываемая). Т.е параллелится сама вычитка и обработка, но это в целом подразумевает ручной коммит оффсета.


                      1. BugM
                        02.08.2021 15:15

                        Писать код так что сообщение не можешь быть необработано. Если что-то залипло дольше чем это разумно роняем все приложение и уходим в рестарт.

                        Есть опасность циклического рестарта и неразгребания очереди. Делаем на это мониторинг. Звоним по телефону, будим разработчиков. После пары звонков в три ночи все чиниться.


                      1. DDtKey
                        02.08.2021 15:58

                        Ну т.е. сама обработка, как я упомянул - синхронно идет (в параллель само считывание и обработка, но не обработка отдельно взятых сообщений)?
                        Нет гонки между обработкой сообщений одной партиции?
                        (в ином случае, ронять приложение по некому таймауту не избавит от "неудачи по середине" и при следующем поднятии либо утеряется сообщение, либо повторно считаются уже обработанные)

                        Но spring-kafka нам обеспечивает схожее поведение при использовании ConcurrentKafkaListenerContainerFactory, с встроенной очередью сообщений на обработку и кэлбеком об успехе/неудаче.

                        Ручной коммит вполне себе хорошо контролируемый и явный процесс.

                        Ну и в целом повторное чтение можно не боятся при обеспечении идемпотентности, но все же оптимальный ли это путь - остается вопросом профиля использования.


                      1. BugM
                        02.08.2021 17:23

                        Просто ждём пока все нынешнее и все предыдущее не обработается. Комитить с дыркой посередине нельзя бай дизайн. Обрабатывающий пул принимает вычитанное в порядке очереди вычитывания. Порядок нарушаться может и надо быть к этому готовым.

                        Обработка параллельная, сообщения нужны достаточно равномерные по времени обработки.

                        Падать если что-то не обработалось и непонятно почему это по сути единственный вариант. Работать дальше нельзя никак. Состояние приложения неопределенное получается.


                1. LaRN
                  02.08.2021 09:48

                  В этом случае без всяких доработок в коде попросят у devops поднять ещё один контейнер с этим приложением и добавить партиций на топик и обработка пойдёт пропорционально быстрее.


                  1. DDtKey
                    02.08.2021 10:23

                    Ну это можно организовать и в одном приложении, при желании (1 поток на 1 партицию)


                    1. LaRN
                      02.08.2021 12:18

                      Это сильно сложнее и может принести регресс. Кроме этого количество партиций может увеличиваться настройкой кафки. Если использовать devops, то просто подрастёт количество контейнеров с приложением.

                      Кроме этого может оказаться, что узкое место процессор например или память и тогда добавление потоков на одной машине задачу в какой-то момент перестанет решать. А контейнеры могут подниматься на разных машинах и не имеют такого ограничения.


                      1. DDtKey
                        02.08.2021 13:37

                        Управлять контейнерами действительно будет проще.

                        Но на деле, одной машиной читать несколько партиций не сильно сложнее.

                        Конечно, bottleneck может быть в другом месте, потому и решения нужно выбирать соответствующие профилю использования :)


                  1. BugM
                    02.08.2021 12:08

                    После какого-то числа контейнеров это начинает влиять на деньги. Контейнеры они денег стоят. А их сотни бывают. И приходит мысль что дешевле поправить код.


                    1. LaRN
                      02.08.2021 12:22

                      Тут вопрос умного devops. Не нужно всегда держать рабочими много контейнеров, при уменьшении нагрузки можно лишние потушить.

                      Например, если количество не обработанных сообщений в топике стало падать на какой-то процент в единицу времени, значит пик нагрузки прошёл и можно освобождать ресурсы.


                      1. BugM
                        02.08.2021 12:38

                        Можно. Только это медленно. Сколько там типовое время реакции? Минуты? Дневные волны так обрабатывать ок. Более мелкие уже не ок.

                        И как обычно не все облака умеют не тратить деньги за выключенные контейнеры. Бывает так что укажи свой максимум. И все. Не используешь - сам виноват. Никому они ночью не нужны.

                        Мы упираемся в один поток. Который имеет привычку висеть на io. Добавляем инмемори кеш с чем угодно и вот вам странный контейнер Одно ненагруженное ядро и 10гб рам. Если это надо разложить по железкам то девопс к вам первый с угрозами придёт. Это по железкам плохо раскладывается.

                        Я как-то предпочитаю строить софт без таких ограничений. Это на самом деле не очень сложно.


                      1. LaRN
                        02.08.2021 12:50

                        Ну тут и не нужна мгновенная реакция, минуты и десятки минут, потому что пики могут периодически повторяться, а поднимать и тушить контейнер это тоже время.

                        Ну и мы тут не любой гипотетисчкий кейс смотрим, а кейс с читаем из kafka. Совсем универсальное решения, оптимального по всем параметрам для всех вариантов использования понятное дело нет.


                      1. BugM
                        02.08.2021 15:17

                        Конечно. Без конкретики это все чистая теория.