Меня зовут Андрей Бугаков, разработчик в компании Datanomica. Я решил написать эту статью после ошибки в production, думаю, она может оказаться полезной другим разработчикам. Ниже рассказал, как в рамках разработки нового сервиса мы выбирали оптимальный вариант стратегии ребалансировки и изучали влияние различных стратегий ребалансировки на потерю сообщений. 

Kafka — достаточно популярная потоковая платформа, которая позволяет хранить, обрабатывать и доставлять данные в высоконагруженных проектах. Kafka разработана так, что ее можно использовать и как транспорт для передачи информации, и как хранилище. Так же архитектура предоставляет возможности по довольно удобному горизонтальному масштабированию. 

 В Kafka используется publish-subscribe модель распространения сообщений. Основные сущности: topic, partition, producer, consumer, consumer group. Ниже объясню термины подробнее, если вы уже знаете их — пролистните  таблицу.

Topic

Ключевое понятие в архитектуре Kafka. Это канал, который используется для организации или хранения потоков данных.

Producer

Приложение или компонент, отправляющие (публикующие) сообщения в topic Kafka.

Consumer

Приложение или компонент, читающее сообщения из topic Kafka.

Partition

Физическое и упорядоченное хранилище сообщений внутри topic. Каждый topic может быть разделен на одну или несколько partition. Внутри partition сообщения являются логической последовательностью, упорядоченной по времени или ключам сообщений.

Consumer Group

Логическая группа, состоящая из одного или нескольких consumers, которые работают совместно для чтения сообщений из topics Kafka. Каждая consumer group обрабатывает набор partitions в topic, причем каждая partition обрабатывается только одним consumer из группы (это довольно важно в контексте статьи).

Как все начиналось 

Какое-то время мы разрабатывали решение на микросервисах, которым должны были заменить старое монолитное решение в нашей компании. 

Прежде, чем перейти дальше, уточню — для развертывания в production у нас используется подход canary release (канареечный релиз). Таким образом, сначала происходит установка одного экземпляра сервиса, проверяются ошибки интеграции, потребителей и далее приложение раскатывается на весь входящий трафик. 

Нам нужно было постепенно включать потребителей в рамках миграции со старого решения на новое. Большинство из них были готовы перейти на взаимодействие с использованием Rest, но были и те, кто не был готов, поэтому в качестве транспорта была выбрана Kafka.

Особенность этих интеграций заключается в том, что подразумевается ограниченное время на запросы потребителя с таймаутом от 3 до 20 секунд, в зависимости от конкретного потребителя. Нагрузка по этому каналу интеграции предполагалась около 500-800 RPS, общая нагрузка же около 1,5-2 тысяч запросов в секунду. 

На тот момент опыта работы с Kafka с нагрузкой выше 50 запросов в секунду не было, и раньше мы с командой не сталкивались с требованием связанным с ограничением по времени ответа. 

Для разработки была использована библиотека spring-kafka и наработки по предыдущим сервисам работавшим с Kafka. Далее, реализовали сервис, успешно прошли нагрузочное тестирование и установили сервис в production среду. Сервис успешно работал и в production проблем не вызывал. Также, первое время данный сервис обновлялся во время технического окна, в которое нагрузка на topics снижалась до нуля и переводилась на старое решение, поэтому все обновления происходили без каких либо проблем. Первый звонок прозвенел после того, как мы вывели старую систему из эксплуатации и сервис, читающий из Kafka, потребовал установки доработок. 

И тут произошло самое интересное :)

На мониторинге коллеги от команды сопровождения увидели примерно такую картину: 

График количества ответов в единицу времени(Requests/T)
График количества ответов в единицу времени(Requests/T)

График сделан от руки, чтобы показать влияние обновления системы на получение ответов внешней системой. 

Рассмотрим график подробнее и попробуем сопоставить интервалы на графиках с имеющейся информацией от команды сопровождения:

Т1-Т3: нормальная обработка сообщений, новый экземпляр приложения еще не запущен, 

Т3-Т4: отсутствует обработка сообщений, 

Т8-Т13: повышение количества обработанных сообщений, 

Т13-Т15: стабилизация количества ответов, 

Т15-Т20: отсутствует обработка сообщений, 

Т20-Т26: всплеск количества обработанных сообщений, 

Т26-Т28 и далее: нормальная работа приложения.

Обновление приложения происходило в два этапа в связи с установкой при режиме canary release, временной интервал Т3 соответствует включению одного приложения новой версии и добавлению нескольких новых consumers в consumer group. Далее, в момент времени Т15-Т16 добавляется еще несколько consumers и в этот момент происходит переключение кластера на новую версию приложения, старые версии приложения выключаются. 

Таким образом, как видно по графику, происходит полная остановка обработки сообщений, потом происходит всплеск, и нагрузка возвращается к стандартным значениям.

Что произошло 

Добавление новых consumers запускало механизм ребалансировки. Ребалансировка consumers Kafka — процесс, который происходит, когда новый consumer добавляется в consumer group Kafka или удаляется из нее, а так же, когда существующий consumer в consumer group дает сбой или перезапускается.

В это время Kafka перераспределяет partitions между активными consumers в consumer group, чтобы гарантировать, что каждая partition обрабатывается только одним consumer. После завершения перебалансировки consumers возобновляют обработку сообщений. Этот процесс помогает обеспечить равномерное распределение работы по потреблению сообщений между членами consumer group.

Далее, думаю, стоит разобраться какие стратегии ребалансировки существуют и как они работают.

Стратегии ребалансировки

Выбор стратегии ребалансировки задается параметром:

partition.assignment.strategy

Kafka clients предоставляет четыре стратегии ребалансировки: RangeAssignor, RoundRobinAssignor, StickyAssignor и CooperativeStickyAssignor.

В случае необходимости смены стратегии требуется задать перечень стратегий для перехода на необходимую стратегию, в порядке приоритета использования, например, если изначально использовался режим балансировки по умолчанию - RangeAssignor, и требуется перейти на стратегию CooperativeStickyAssignor то требуется указать полное имя класса, имплементирующего класс PartitionAssignor:

partition.assignment.strategy: 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.
  kafka.clients.consumer.RangeAssignor

Все consumers внутри consumer group должны использовать одну и ту же стратегию, в случае, если в группе будут указаны разные стратегии, то будут возникать ошибки вида:

org.apache.kafka.common.errors.InconsistentGroupProtocolException: 
The group member's supported protocols are incompatible with those of existing
members or first group member tried to join with empty protocol type
or empty protocol list

Взглянем на стратегии перебалансировки и их влияние на распределение partitions:

  • Range Assignor. 

Как работает: назначает partitions consumers на основе диапазона идентификаторов. Сначала сортирует partitions и consumers по их идентификаторам, а затем назначает разделы каждому consumer на основе диапазона идентификаторов partitions. Данная стратегия используется по-умолчанию.

Что гарантирует: каждому consumer будет назначен ряд смежных partitions.

  • Round Robin Assignor. 

Как работает: назначает partitions consumers в стиле round-robin. Сортирует partitions и consumers по их идентификаторам, а затем назначает каждый раздел следующему доступному consumer циклически. Звучит очень похоже на стратегию выше, данный способ позволяет задействовать большее количество consumers, но не минимизирует перемещения partitions в consumer group.

Что гарантирует: каждый consumer имеет равное количество partitions и также в случаях использования одной consumer group для чтения из нескольких topics, также может позволить более равномерно распределить нагрузку между consumers в consumer group, задействовав большее количество consumers.

  • Sticky Assignor. 

Как работает: стратегия предназначена, чтобы минимизировать количество partitions, которые перемещаются между consumers во время перебалансировки. Он назначает partitions consumers на основе хэша идентификатора partition и идентификатора consumer.

Что гарантирует: каждая partition всегда назначается одному и тому же consumer, если этот consumer все еще является частью consumer group. Если consumer покидает consumer group, его partitions перераспределяются между оставшимися членами consumer group.

  • Cooperative Sticky Assignor. 

Появился в kafka-clients версии 2.7.0.

Как работает: стратегия перебалансировки точно такая же как Sticky Assignor, но она предназначена для работы с протоколом кооперативной балансировки consumers Kafka. Кооперативный протокол позволяет consumers обмениваться информацией об их текущем состоянии, такой как количество partitions, которые они обрабатывают, и объем данных, которые они получают. Cooperative Sticky Assignor использует эту информацию для оптимизации назначения partitions на основе балансировки нагрузки.

Тестирование стратегий ребалансировки

Чтобы проиллюстрировать влияние различных стратегий ребалансировки на распределение partitions и на обработку сообщений, мы можем использовать два смоделированных сценария.

Первый сценарий

Данный сценарий позволит оценить изменения при перераспределении partitions.

Окружение:

  • topic Kafka с 10 partitions;

  • consumer group с 4 consumers (C1, C2, C3 и C4), подписавшихся на этот topic.

Описание сценария:

  • оценить первоначальное распределение partitions между consumers;

  • добавить новый consumer (C5) в группу и оценить перебалансировку partition между всеми 5 consumers.

Ниже описано влияние на распределение partitions в зависимости от выбранной стратегии:

  • Range Assignor

Ниже можно увидеть как при воспроизведении первого тестового сценария распределяются partitions: 

 

При поднятии четырех consumers partitions распределятся следующим образом
При поднятии четырех consumers partitions распределятся следующим образом
При добавлении нового consumer partitions могут распределятся подобным образом
При добавлении нового consumer partitions могут распределятся подобным образом

На втором графике стрелками красного цвета отображены изменения назначений partitions для consumers, зелеными стрелками - назначение partitions для нового consumer (далее буду использовать подобный подход для отображения изменений состояний consumers).

Исходя из результатов становится очевидно, что для этого примера ребалансировка затрагивает минимум 7 partitions, это довольно много, но тем не менее ребалансировка равномерно распределила partitions между consumers.

Когда стоит выбрать: Стоит выбрать данную стратегию если время на ребалансировку и количество переназначенных partitions не имеет значения.

  • Round Robin Assignor 

При поднятии четырех consumers partitions распределятся следующим образом
При поднятии четырех consumers partitions распределятся следующим образом
При добавлении нового consumer partitions могут распределятся подобным образом
При добавлении нового consumer partitions могут распределятся подобным образом

Исходя из результатов становится очевидно, что для этого примера ребалансировка затрагивает минимум 6 partitions, это довольно много, но тем не менее ребалансировка равномерно распределила partitions между consumers.

Когда стоит выбрать: если время на ребалансировку и количество переназначенных partitions не имеет значения и требуется максимально равномерное распределение partitions.

  • Sticky Assignor

При поднятии четырех consumers partitions распределятся следующим образом
При поднятии четырех consumers partitions распределятся следующим образом
При добавлении нового consumer partitions могут распределятся подобным образом
При добавлении нового consumer partitions могут распределятся подобным образом

Исходя из результатов становится очевидно, что для этого примера ребалансировка затрагивает 2 partitions. Это очень хороший результат, и преимущество данной ребалансировки в том, что consumers C1, C2, C3, C4 не прекращают обработку сообщений из partitions во время ребалансировки, таким образом сокращается время простоя и влияния ребалансировки на обработку входящих сообщений.

Когда стоит выбрать: Стоит выбрать данную стратегию если требуется минимизация времени простоя при обновлении приложения и минимизации перемещений partitions между consumers.

  • Cooperative Sticky Assignor. 

Пример ребалансировки аналогичен StickyAssignor.

Когда стоит выбрать: аналогично рекомендациям по StickyAssignor и в случае, если может пригодиться динамическое перераспределение partitions внутри consumer group, даже если не был изменен состав consumer group.

Второй сценарий

Сценарий для оценки влияния перебалансировки на обработку сообщений под нагрузкой.

Окружение:

  • topic кафка с 50 partitions;

  • нагрузка 600 RPS.

Описание сценария:

  • начало теста с 50 consumers;

  • повышение количества consumers в два раза (до 100 consumers);

  • снижение количества consumers до 50 consumers;

  • рестарт 10 consumers;

  • остановка теста.

Для этого сценария будем считать «потерянными» сообщения между отправкой и получением которых прошло более 3 секунд.

Мы можем смоделировать эти сценарии, используя различные стратегии перебалансировки и наблюдать результирующие назначения partitions и оценить время затрачиваемое на перебалансировку и количество «потерянных» сообщений. 

Для этого сценария я составил сравнительную таблицу для оценки влияния обновления приложения на количество потерянных сообщений и времени затрачиваемого на ребалансировку.

RangeAssignor

RoundRobinAssignor

CooperativeStickyAssignor/StickyAssignor

Количество записей

~700000

Первый этап сценария: повышение количества consumers в два раза до 100

Записей потеряно

0

967

0

Максимальное время ответа, мс

2274

9000

1064

Время затраченное на ребалансировку

2850

4360

2760

Второй этап сценария: понижение количества consumers в два раза до 50

Записей потеряно

5977

114

0

Максимальное время ответа, мс

14394

3689

852

Время затраченное на ребалансировку

10400

800

2850

Третий этап сценария: рестарт 10 consumers

Записей потеряно

212

7

465

Максимальное время ответа, мс

4263

3153

6713

Время затраченное на ребалансировку

2950

1580

3410

Итоги

Записи с таймаутами всего (>3 с)

6189

1088

465

По итогам данного сценария, можно сделать выводы о том, как разные стратегии влияют на обработку сообщений, и по результатам теста получается, что использование CooperativeStickyAssignor/StickyAssignor оказывает меньшее влияние на «потерю» сообщений, и в большинстве этапов показал себя значительно лучше, чем другие стратегии ребалансировки. 

Для анализа влияния стратегий ребалансировки на обработку сообщений использовались следующие метрики:

Consumer Lag — количество записей, которые ждут, когда consumer их обработает. В приложениях мониторинг задержки consumers может помочь вам выявить потенциальные узкие места в конвейере обработки.

kafka_consumer_fetch_manager_records_lag
kafka_consumergroup_lag

Partition Assignment — количество partitions, которые в настоящее время назначены каждому consumer в группе. Мониторинг назначения partitions может помочь вам выявить дисбаланс в распределении и оптимизировать балансировку нагрузки. 

kafka_consumer_coordinator_assigned_partitions

Rebalance latency max — максимальное значение времени, потраченное на ребалансировку. В высоконагруженном приложении минимизация времени перебалансировки может помочь уменьшить сбои в обработке сообщений и повысить общую производительность. 

kafka_consumer_coordinator_rebalance_latency_max

Last rebalance seconds ago — количество секунд прошедших с последнего события ребалансировки в consumer group.

kafka_consumer_coordinator_last_rebalance_seconds_ago

Вывод из всей истории

В рамках нашего проекта и сравнив показатели влияния различных ребалансировок на потерю сообщений мы выбрали для себя оптимальный вариант с использованием CooperativeStickyAssignor. Но это не «серебрянная пуля», в рамках вашего приложения может оказаться, что другие стратегии ребалансировки покажут себя не хуже :)

По итогам всех исследований хотелось бы сформулировать несколько советов, которые я бы дал себе до того, как случился описанный выше инцидент:

  • настройте мониторинг Kafka (брокеров и приложений);

  • отслеживайте Lag в метриках и настройте алерты;

  • проводите нагрузочное тестирование с обновлением приложения под нагрузкой;

  • подумайте, а надо ли вам использовать взаимодействие через кафку в «синхронном» ключе?

Комментарии (0)