Привет! Меня зовут Геннадий, я руковожу командой разработки системы учета товаров в Ozon. Мы активно используем Kafka как основной инструмент для асинхронного взаимодействия между нашими сервисами. Для нас Kafka — это не просто очередь сообщений, а один из ключевых компонентов всей архитектуры. Поэтому мы постоянно погружаемся в его тонкости и нюансы, чтобы грамотно настраивать и использовать его возможности. Думаю, многие из вас сталкиваются с тем же — когда Kafka становится критически важной частью вашего решения.

Один из ключевых элементов Kafka — это протоколы групп и механизм ребалансировки. Несмотря на то, что тема большая и непростая (особенно с учетом возможности расширять протоколы Consumer Group), я собрал для вас самое важное и объясню это простым и понятным языком.

Хотя информации о ребалансировке Kafka достаточно, она часто либо слишком разрозненная и техническая, либо наоборот — поверхностная и без акцента на важные детали. Особенно это касается гарантий Kafka и поведения протокола Cooperative Sticky. Даже опытные специалисты иногда путаются в деталях: stop-the-world, барьеры, две фазы ребалансировки… Давайте разберёмся во всём этом шаг за шагом.

Кратко о ключевых понятиях

Перед тем как двигаться дальше, важно быть знакомым с рядом базовых понятий, на которых строится работа групп и ребалансировки. В этой статье мы не будем углубляться в них подробно. Если нужно освежить память, рекомендую вот этот набор статей.

  • Брокер (broker) – узел кластера Kafka, который выполняет различные функции: управление топиками, хранение чекпоинтов (комитов), координация групп и т. д.

  • Консьюмер (consumer) – клиент Kafka, основная задача которого – читать сообщения из топиков.

  • Топик (topic) – распределённый файл (лог), разбитый на секции (партиции).

  • Партиция (partition) – секция (шард) топика.

  • Группа (group) – именованный набор консьюмеров, объединённых для совместного чтения топиков.

Обычно консьюмеры в группе работают по принципу: одна партиция — один консьюмер. Это нужно, чтобы избежать конкурентной обработки данных. То есть каждая партиция в любой момент времени принадлежит только одному консьюмеру из группы.

Группа может быть:

  • Симметричной – все консьюмеры хотят читать одни и те же топики.

  • Асимметричной – разные консьюмеры подписаны на разные топики.

Для нашего обсуждения эта разница несущественна, поэтому будем считать, что у нас симметричная группа, которая читает один или несколько топиков с одной или более партициями.

Группы изолированы друг от друга. Это значит, что консьюмеры из одной группы не делятся "интересами" с консьюмерами из другой. Несколько групп могут независимо читать одни и те же топики.

Формирование группы

У вас есть топик, в который поступают данные от продюсеров, и их нужно обрабатывать. Сначала этот топик никто не читает — группа либо отсутствует, либо пуста.

Чтобы организовать группу, первый консьюмер подключается к брокеру, который будет её обслуживать. Этот брокер становится координатором группы.

Как его найти? Консьюмер отправляет FindCoordinator с именем группы на любой брокер. Тот вычисляет координатора (подробнее).

Затем консьюмер отправляет JoinGroupRequest координатору, указывая имя группы, метаданные подписки (интересующие топики) и ряд другой информации:

JoinGroup Request =>
  group_id => имя_группы
  session_timeout_ms => общий_таймаут_сессии
  rebalance_timeout_ms => таймаут_ребалансировки
  member_id => идентификатор_консьюмера
  protocols => // а это уже массив протоколов и их метаданные
    name => имя_протокола (н-р.: range, roundrobin, cooperative-sticky, etc)
    metadata => это те самые метаданные – наша заинтересованность в топиках

При подсоединении к группе можно указать несколько протоколов, чтобы выбрать общий поддерживаемый. Это удобно для обратной совместимости и обновления протоколов.

Range и roundrobin - это протоколы групп, относящиеся к eager-семейству и различающиеся алгоритмом назначения партиций на консьюмеры. CooperativeSticky - протокол группы семейства cooperative. О типах будет рассказано далее.

Часть полей в запросе не приводится с целью упрощения.

Метаданные подписки – в них передаются топики, которые интересны консьюмеру, а также дополнительная информация:

  • OwnerPartitions – текущие обрабатываемые партиции (в eager не используется, будет расскрыто далее),

  • Информация о rack (физическом расположении консьюмера),

  • Эпоха группы,

  • Другие параметры.

Координатор группы (брокер) принимает запрос и выбирает лидера группы — консьюмера, который соберёт подписки и распределит партиции.

Лидером становится первый присоединившийся (или случайный, если лидер покинул группу). Так как пока консьюмер один, он автоматически получает эту роль.

Координатор отвечает JoinGroupResponse. Лидеру передаются метаданные подписки всех консьюмеров, остальным — пустые метаданные (но у нас пока никого нет, кроме лидера):

JoinGroup Response
   protocol_name => выбранный первый общий поддерживаемый протокол
   leader => member_id лидера (если memberId и leader совпадают, значит консьюмер был выбран лидером)
   member_id => member_id консьюмера
   members => // это массив метаданных подписки собранных со всех консьюмеров
       member_id => member_id консьюмера
       metadata => метаданные подписки, в формате выбранного протокола

Лидер декодирует метаданные подписки, извлекает список интересующих группу топиков и дополнительно запрашивает метаданные топологии кластера — информацию о партициях (чтобы узнать какие партиции распределять). Затем запускает алгоритм распределения (partition assigner), формирует назначения для консьюмеров и отправляет SyncGroupRequest координатору:

SyncGroup Request
   group_id => группа
   member_id => идентификатор_консьюмера
   protocol_name => выбранный протокол
   assignments => // массив назначений для каждого консьюмера. Заполняется только лидером
       member_id => идентификатор консьюмера
       assignment => назначенные консьюмеру партиции (+ доп. информация, в зависимости от протокола)

Поле assignment – назначенные консьюмеру партиции не имеет жёсткой структуры — брокеры Kafka не анализируют его содержимое, но есть соглашения о минимальном содержимом, позволяющие использовать их, например, в мониторинге.

В частности, с помощью поля AssignedPartitions вы видите какие партиции какому консьюмеру назначены через различные утилиты.

Координатор отправляет SyncGroupResponse каждому консьюмеру, передавая только его назначения.

Консьюмер декодирует assignment, извлекает список своих партиций, находит их в сети и начинает обработку.

Рис1. Формирование группы
Рис1. Формирование группы

Так происходит формирование группы.

Ребалансировка

Теперь рассмотрим случай, когда в группе уже есть консьюмеры, и подключается новый.

Ему нужно либо получить партиции нового топика, либо перераспределить партиции с существующих консьюмеров. Он отправляет JoinGroupRequest с теми же данными: имя группы, протокол, таймауты и список интересующих топиков.

Координатор видит, что группа уже существует, и запускает ребалансировку, оповещая всех участников. Обычно это делается через Heartbeat или Commit (иногда через JoinGroup/SyncGroup).

  • Heartbeat — периодический пинг между консьюмером и координатором. Если консьюмер не отвечает в течение session.timeout, он считается умершим, и запускается ребаланс.

  • Commit — фиксация состояния консьюмера (подробнее).

Ребаланс может происходить и по другим причинам, например, при добавлении партиций в топик. Но сам процесс остаётся тем же.

Консьюмеры, получившие уведомление о ребалансировке тоже отправляют JoinGroupRequest, подтверждая свою заинтересованность (обычно она не меняется).

Координатор собирает метаданные подписки, отправляет их лидеру, а остальным — пустой ответ.

Затем все отправляют SyncGroupRequest, но лидер конечно же запускает алгоритм распределения партиций (не возвращаться же с пустыми руками!). Координатор передаёт эти назначения всем консьюмерам через SyncGroupResponse, и консьюмеры начинают обрабатывать партиции.

Рис 2. Подключение к группе
Рис 2. Подключение к группе

Важная гарантия

Есть важный нюанс. Ребалансировка – это асинхронное событие, которое может инициироваться фоновыми процессами (heartbeat или commit). А асинхронно оно по отношению к обработке сообщений.

Рис 3. Обработка и события ребалансировки
Рис 3. Обработка и события ребалансировки

Мы не можем игнорировать тот факт, что какая-то партиция обрабатывается (и может обрабатываться очень долго), а в это время её уже назначили другому консьюмеру из-за ребалансировки. Чтобы избежать ситуации, когда одну партицию одновременно читают два консьюмера, нужно синхронизировать обработку и ребалансировку.

Это можно сделать разными способами: дождаться завершения обработки или прервать обработку, чтобы быстрее перераспределить партиции.

Но в любом случае вся группа должна подождать, пока произойдет синхронизация.

Сколько ждать? Для этого существует параметр rebalance_timeout. Когда инициируется JoinGroupRequest, координатор запускает ребаланс, но не отправляет JoinGroupResponse, до тех пор, пока все консьюмеры не подсоединятся к ребалансировке. Группа блокируется на время равное rebalance_timeout, ожидая, чтобы все успели присоединиться.

Рис 4. Барьер
Рис 4. Барьер

Это и есть тот самый барьер, который позволяет Kafka гарантировать, что одна партиция в каждый момент времени обрабатывается не более чем одним консьюмером. Опять же, при соблюдении договоренностей (таймингов).

Мы разобрали базовые принципы ребалансировки. Теперь давайте разберёмся в разных протоколах групп. Их можно разделить на два типа: eager и cooperative.

Eager

Eager — семейство протоколов групп с агрессивным подходом к ребалансировке. Здесь «остановка мира» самая жесткая. При ребалансировке каждый консьюмер полностью останавливает обработку и очищает буферы с уже загруженными сообщениями из назначенных партиций.

Рис 5. Ребалансировка в Eager
Рис 5. Ребалансировка в Eager

Остановка обработки происходит до отправки JoinGroupRequest. Пока все консьюмеры не завершат обработку и не отправят JoinGroupRequest, вся группа простаивает (барьер).

А что, если кто-то не успеет присоединиться к ребалансировке за rebalance_timeout? Верно, его исключат из группы, ребалансировка продолжится без него, а его партиции перераспределятся между оставшимися. Да, это может быть рискованно, но другого выхода нет.

Причём даже если ушедший консьюмер не владел партициями, ребаланс всё равно запустится, вызывая "жёсткую" остановку. Тоже самое, если новый консьюмер подключился и на него нечего было назначить. Это неэффективно.

Вот тут на сцену выходит cooperative sticky.

Cooperative sticky

Cooperative sticky отличается от eager двумя ключевыми моментами:

  1. Консьюмеры сообщают лидеру, какие партиции у них были назначены (в eager на момент JoinGroup revoke уже произошёл и список партиций пуст).

  2. Во время первой ребалансировки обработка партиций продолжается, отзыва не происходит. Отзыв партиций происходит до начала второй ребалансировки (но вторая ребалансировка может и не случиться).

Представим, что у нас работают два консьюмера, у которых три партиции одного топика (topic1). Теперь к группе присоединяется третий с тем же топиком…

Консьюмер 3 отправляет JoinGroupRequest, указывая, что хочет подписаться на topic1.

Координатор оповещает текущие консьюмеры о ребалансе.

Консьюмеры 1 и 2, не останавливая обработку, формируют JoinGroupRequest, передавая в метаданных список текущих партиций (ownedPartitions).

Координатор отправляет эту информацию лидеру в JoinGroupResponse, а остальным — пустые метаданные подписки.

Лидер:

  • Вычисляет новое распределение партиций, но с учетом минимизации "переездов".

  • Формирует assignment, где указывает, какие партиции у кого остаются.

Координатор перенаправляет assignment через SyncGroupResponse.

Каждый консьюмер:

  • Сравнивает текущие партиции со списком из assignment.

  • Если ничего «отдавать» не нужно — ребаланс окончен, обработка продолжается.

  • Если часть партиций нужно отдать — консьюмер должен синхронизироваться с обработкой, прежде чем отпустить их.

Рис 6. Ребалансировка в cooperative-sticky: фаза 1
Рис 6. Ребалансировка в cooperative-sticky: фаза 1

После остановки обработки консьюмер очищает буферы отозванных партиций и отправляет JoinGroupRequest, но теперь указывает только оставшиеся у него партиции.

Это запускает новую ребалансировку:

  • Остальные консьюмеры получают сигнал и тоже отправляют JoinGroupRequest, сообщая о своих партициях.

  • Координатор передает всю информацию лидеру через JoinGroupResponse.

Лидер перераспределяет отозванные партиции среди участников.

Финальный этап:

  • Все отправляют SyncGroupRequest.

  • Лидер сообщает окончательное распределение партиций.

  • Те, кому добавили новые партиции, назначают их и запускают обработку.

Таким образом, удается минимизировать остановку обработки, затрагивая только те партиции, которые переезжают. Однако даже в cooperative sticky есть барьер - если несколько партиций отзываются, JoinGroupRequest будет зависать, пока все консьюмеры их не отпустят (и не присоединятся ко второму ребалансу).

Рис 7. Ребалансировка в cooperative-sticky: фаза 2
Рис 7. Ребалансировка в cooperative-sticky: фаза 2

Когда дело может закончиться одной ребалансировкой?

  • Если подключился консьюмер, на который нечего ребалансировать – все партиции уже распределены. Все подключились, сообщили свои текущие партиции, перераспределять нечего, отзывать, стало быть, также. Значит все завершится без какой-либо остановки и второго ребаланса не будет.

  • Если отключился консьюмер, который владел какой-то партицией. В этом случае он, до того как сделать LeaveGroup, уже ее отпустил. Координатор инициирует ребаланс, все подключаются и сообщают о своих текущих партициях, лидер оставляет все как есть, но освободившуюся партицию добавляет кому-то в назначение. Синхронизировать остановку не нужно, поэтому все завершается одним ребалансом.

Интересный кейс: cooperative sticky в контейнерной среде

Представьте: у вас есть стабильный релиз с 10 pod'ами, которые обрабатывают 10 партиций — классическая схема, где каждый pod обслуживает по одной партиции.

Теперь вы выкатываете новый релиз по стратегии «канарейка», добавляя pod'ы нового релиза рядом со старыми. Это сразу запускает ребалансировку (а может и несколько — ведь pod'ы подключаются не одновременно). Однако с включённым Cooperative Sticky новые pod'ы могут остаться «без работы» — все 10 партиций останутся закреплёнными за старыми контейнерами, чтобы минимизировать количество revoke и связанных с этим пауз в обработке.

И тут кроется проблема: до тех пор, пока старые pod'ы не будут удалены, весь топик продолжает обслуживаться только старым релизом. А когда вы начнёте удалять старые pod'ы, это вызовет полноценную ребалансировку — и, скорее всего, не одну, так как pod'ы уходят не одномоментно.

Как избежать простоя? Можно постепенно уменьшать количество старых pod'ов, заставляя Kafka перераспределять партиции на новый релиз. Эдакая "канарейка для бедных".

Заключение

На этом все!

Мы с вами рассмотрели протоколы Eager, Cooperative Sticky, барьеры, но тема ребалансировки в Kafka куда шире. Разные алгоритмы распределения, статические группы, специфичные сценарии использования — всё это требует детального разбора.

Обязательно продолжим углубляться в эту тему. Спасибо за внимание! Ждите новых статей, и до скорых встреч!

Дополнительные материалы:

https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/ https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol

Комментарии (2)


  1. Skyuzi
    22.05.2025 11:05

    Очень полезная статья, спасибо!


  1. Laric17
    22.05.2025 11:05

    Большое спасибо. Особенно, за картинки.