Привет! Как и обещала в первой части — теперь про ребалансировки и не только.

Выделяют безотлагательную и совместную ребалансировки. 

При безотлагательной ребалансировке пользователи прекращают потребление из партиции, которые им назначены, затем отказываются от права владеть этими партициями, после снова присоединяются к consume group, получают совершенно новое назначение партиций и после этого возобновляют потребление. По сути, это короткое окно недоступности для всей consume group. Длина этого окна зависит от количества пользователей и некоторых параметров конфигурации.

Совместная ребалансировка - это инкрементная ребалансировка, при которой переназначается лишь некоторое подмножество партиций, и пользователи продолжают потребление из тех партиций, которые не были переназначены. Координатор уведомляет пользователей, что они должны будут отказаться от права владения некоторым подмножеством партиций, они прекращают потребление и отказываются от права владения. Затем эти партиции назначаются новым владельцам. Подход может включать несколько итераций, пока не будет достигнуто равномерное распределение. В отличие от безотлагательного подхода, мы не получаем полной недоступности. Это важно для больших consume groups, где балансировка может занять значительное количество времени.

К параметрам пользователей, которые связаны с ребалансировкой, относятся:

  • heartbeat.inverval.ms - интервал отправки контрольных сигналов пользователя, по умолчанию три секунды;

  • session.timeout.ms - окно на отправку контрольных сигналов, по умолчанию десять секунд. Обычно окно сессии в три раза больше, чем heartbeat interval;

  • max.poll.inverval.ms - максимальное время на обработку события перед получением следующего, по умолчанию пять минут.

Посмотрим, как это работает. Есть основной поток, в котором периодически запрашиваются новые события для обработки. Мы должны это сделать за max.poll.interval.ms. Также есть фоновый поток, в котором раз в heartbeat.inverval.ms отправляются фоновые сигналы. Координатор должен получить эти сигналы за session.timeout.ms. Когда отправляются контрольные сигналы и координатор их получает, окно сессии открывается заново. 

Окно сессии увеличено для того, чтобы можно было успеть отправить несколько контрольных сигналов. Например, первый контрольный сигнал отправлен успешно, после него два неудачных. При еще одной неудаче началась бы ребалансировка.

Причины ребалансировки

У consume group есть три состояния: empty, rebalance и stable. Сначала мы находимся в состоянии empty, когда consume group пуста. Затем, если пользователь хочет присоединиться к группе, то ему выдается партиция для чтения и происходит ребалансировка. После этого мы переходим в состояние stable, здесь может случиться несколько событий. Например, не сработал heartbeat, пользователь вышел из группы (превышено время ожидания следующего события) или новый пользователь хочет присоединиться к группе. Если все пользователи покинули группу, мы опять переходим в состояние empty.

Бывает такое, что вместе с ребалансировками происходит что-то непонятное, например, вы перестаете получать события. Можно проверить логику пользователей, может быть какая-то долгая обработка, сработал таймер и пользователь вышел из группы, а само приложение думает, что все нормально. Также можно увеличить настройку max.poll.interval.ms - время на обработку события, по умолчанию пять минут. Многие захотели бы поставить ему максимально возможное значение, но на этом этапе может произойти неприятная ситуация. Например, вы поставили максимально большой max.poll.interval и забыли про retention, ведь первый параметр может его превысить.

Предположим, что max.poll.interval = 2000, а retention = 1000. Это значит, что мы можем обрабатывать событие за max.poll.interval, то есть в худшем случае вы будете обрабатывать событие 2000 миллисекунд. За это время retention может успеть сработать два раза. Это значит, что вы потеряете все эти события. Скорее всего, вы даже об этом не узнаете или узнаете позже, чем того хотелось бы.

Бывает, что несколько пользователей в приложении находятся в одной consume group и подписаны на несколько различных топиков. Есть ситуации, когда это хорошо, например, когда вы используете стратегию распределения партиций Round Robin, тогда нагрузка может быть распределена более равномерно. Но иногда вы не хотите, чтобы это происходило, потому что если ребалансировка происходит в одном топике, то автоматически запускается ребалансировка в другом топике. Если этого нужно избежать, то нужно сделать несколько consume group под каждый топик. Тогда ребалансировка для одного топика не будет влиять на другой топик.

С точки зрения кода с последствиями ребалансировки можно работать с помощью callback. Подробнее о последних.

  • SetPartitionsAssignedHandler определяет, что назначены новые партиции; 

  • SetPartitionRevokedHandler - потеря владения партициями;

  • SetPartitionLostHandler - партиции назначены другому пользователю, используется только при совместной ребалансировке.

Первый callback вызывается после переназначения партиции пользователем, но до того, как он начнет получать события. Здесь подготавливается любое состояние, вся эта подготовка должна быть возвращена за max.poll.interval для того, чтобы пользователь успел присоединиться к consume group.

SetPartitionRevokedHandler вызывается, когда пользователь должен отказаться от партиций, которыми он ранее владел, либо в результате ребалансировки, либо при отключении пользователя. В общем случае, когда используется алгоритм безотлагательной ребалансировки, этот callback вызывается после того, как пользователь перестал потреблять события из партиции, но до ребалансировки. Если используется совместный алгоритм ребалансировки, то этот метод вызывается в конце ребалансировки только с тем подмножеством партиций, которое должно быть распределено.

SetPartitionLostHandler вызывается только при использовании совместного алгоритма ребалансировки и только в исключительных случаях, когда партиции были переназначены новым владельцем без предварительного отзыва алгоритмом ребалансировки. В обычных случаях будет вызван SetPartitionRevokedHandler. Здесь очищаются все состояния и использованные ресурсы. Если этот callback не реализован, то будет вызван SetPartitionRevokedHandler.

Статические члены группы

Ранее мы рассматривали стратегию распределения партиций Sticky Assignor, где партиции снова назначаются тому же пользователю, если он является членом consume group. Но идентификация пользователя в группе по умолчанию является временной. Когда пользователь покидает группу, он теряет все назначения партиций, а при подключении получает совершенно новые назначения.

Можно настроить пользователей с уникальными идентификаторами с помощью group.instance.id. Это сделает пользователя статическим членом группы, и когда такой пользователь отключится, его партиции не перераспределятся, пока не истечет время его сессии. Когда пользователь снова присоединится к группе, ему будут выданы те же партиции без ребалансировки.

Автономный пользователь

Ранее мы рассматривали consume group, для большинства задач этого достаточно, но иногда бывает, что заведомо есть только один пользователь, который читает из всех партиций топика или из каких-то конкретных. Можно не подключаться к consume group и не подписываться на топик, а просто назначить себе несколько партиций. Это делается с помощью consumer.Assign. Мы передаем партицию и offset в этой партиции.

Когда пользователь подключается к топику со своим groupID, ему назначается партиция для чтения и offset в этой партиции. Положение offset конфигурируется на клиенте как указатель на само событие или на самое старое событие. Пользователь запрашивает события из топика, что приводит к их последовательному чтению из журнала. Позиции offset регулярно коммитятся обратно в Kafka и сохраняются как события во внутреннем топике consumer offset. Прочитанное событие все равно не удаляется, и, в отличие от обычного брокера, клиент может перемотать offset, чтобы повторно обработать уже полученное событие. Если зафиксированный offset превышает offset фактически обработанного клиентом события, то расположенные в этом промежутке события будут пропущены consume group. Если зафиксированный offset меньше offset последнего обработанного клиентом события, то события из этого промежутка будут обработаны дважды.

Если выставить параметрам EnableAutoOffsetStore и EnableAutoCommit значения false, то будет необходимо вручную вызывать consumer.StoreOffset и передавать offset в этом топике.

Пользователь не читает события

Бывает такое, что пользователь перестает читать события. Могло произойти следующее:

  • Версия брокера Kafka сильно разошлась с текущей версией библиотеки Confluent Kafka .NET. В этом случае необходимо обновить версию библиотеки до последней актуальной относительно используемого брокера.

  • В одной группе находится несколько пользователей, и сообщение прочитал другой. Нужно проверить правильность выбора consume group.

  • Неправильные параметры конфигурации или слишком долгая обработка в коде (пользователь выпал из consume group). Необходимо проверить кодовую базу и параметры конфигурации.

Чтение из ведомой реплики

Помимо чтения из Leader’ов, с версии 2.4 Kafka поддерживает чтение и из Follower’ов, основываясь на их взаимном расположении. Это полезно для сокращения задержек при обращении к ближайшему брокеру в одной зоне доступности. Однако, из-за асинхронной работы репликации взамен от Follower’ов мы получаем менее актуальные данные, чем в лидерской партиции. Основная цель этой функции - снизить затраты на сетевой трафик, позволяя клиентам получать данные не из лидерской партиции, а из ближайшей синхронизированной.

Например, есть три дата-центра, в первом дата-центре находится лидер, в двух других - ближайшие синхронизированные реплики. Для этого нужно настроить на брокере rack.id, передав туда dc-2 и dc-3, и параметр replica.selector.class, выставив там RackAwakeReplicaSelector. В настройках конфигурации пользователя нужно установить client.rack, эта настройка должна совпадать с rack.id на брокере, то есть dc-2 или dc-3. Таким образом мы сможем читать из ближайших из ближайших синхронизированных реплик, но коммит offset в топик consumer offset все равно будет происходить в лидера. Этот offset будет распространяться Follower’ам через отметку High Watermark при отправке данных лидера. Это вносит небольшую задержку, но позволяет сохранить те же гарантии, что и при чтении из лидера.

Вложенность библиотек

Часто бывает, что на предприятиях используется своя обертка Confluent Kafka .NET. Происходит это потому, что появляется необходимость реализовывать дополнительный функционал или переиспользовать какой-то код в нескольких проектах. Например, это может быть авторизация, или поведение при переназначении партиций, или обработка событий батчами, так как в базовой библиотеке этот функционал не представлен. Таким образом, есть библиотека предприятия, которая является оберткой над Confluent Kafka .NET, которая в свою очередь является оберткой над librdkafka, являющейся C-client Kafka. Если у вас возникают какие-то проблемы и нужно понять, где они происходят, то это весьма нетривиальная задача из-за вложенности. Однако в этом можно себе помочь.

У Consumer и Produser есть настройка debug. Туда передается перечисление параметров, можно передать значение all, тогда будут получены все логи. Так вы можете понять, что именно сломалось или какие именно логи вам нужны. Если логов мало, то есть настройка конфигурации log_level. Она измеряется от одного до семи, по умолчанию шесть. Можно поставить семь, чтобы получить еще больше логов.

Также в дебаге можно помочь себе с помощью хендлеров, которые актуальны как для ConsumerBuilder, так и для ProducerBuilder.

  • В SetErrorHandler передается consumer и ошибка. Там можно обработать ошибку или что-то залогировать;

  • SetStatisticsHandler - хендлер получения статистики. Если указан statistics.inverval.ms больше нуля, то раз в этот интервал будет вызываться этот хендлер. Передается consumer и json-строка;

  • SetLogHandler - хендлер логирования. Передается consumer и сообщение лога. Здесь тоже можно залогировать, извлечь информацию и сделать какие-то дополнительные обработки. Однако нужно помнить, что при падении хендлеров вы не получите эти логи и никакими try-catch не поймаете это.

Аутентификация

Есть несколько видов аутентификации Simple Authentication and Security Layer:

  • GSSAPI - это Kerberos или Active Directory;

  • PLAIN - это Password Server;

  • SCRAM-SHA - это ZooKeeper;

  • OAUTHBEARER - это OAuth Server.

В Kerberos-аутентификации проблемный дебаг под Windows, работает только под Linux или в Docker. Также будет необходимо сгенерировать файл keytab и поковыряться с dockerfile, потому что в некоторых версиях образа могут быть проблемы.

В OAUTHBEARER появляется дополнительное звено в виде OAuth Provider. Для того, чтобы прошла аутентификация в Kafka, нужно сходить к провайдеру за access token, а затем уже идти к брокеру. Чтобы написать свой Producer с помощью OAuth-аутентификации, нужно реализовать свой callback, который вызывается при старте приложения или через 80% времени жизни токена.

После получения токена нужно вызвать у клиента OAuthBearerSetToken, передав туда значение токена, Expiration и Principal. Далее очень важно реализовать блок catch, где будет вызвана OAuthBearerSetTokenFailure со строкой ошибки. Kafka позволяет нам делать retry этого callback раз в десять секунд по умолчанию. Если вы не реализуете этот catch, то retry не будет. Тогда рано или поздно токен протухнет и придется перезапускать приложение. Далее нужно вызывать метод SetOAuthBearerTokenRefreshHandler у Producer Builder и передать туда callback. Однако здесь есть баг библиотеки librdkafka следующего характера.

Также баг может возникать при передаче слишком длинной ошибки в OAuthBearerSetTokenFailure, на это тоже нужно обращать внимание.

Про Log Compaction

Часто у разработчиков возникает желание хранить данные в Kafka вечно, потому что она позволяет создавать достаточно гибкие системы и избегать миграции. В Kafka есть возможность использования механизма Log Compaction, и перед удалением событий по retention сохраняется последняя версия событий с одним ключом.

К примеру, есть несколько событий, у них один ключ, и сохраняется последняя версия событий. Это подходит, если нужно хранить какие-то агрегированные данные или только последнюю версию событий.

Многоуровневое хранилище

Начиная с Kafka 3.6.0 доступно использование многоуровневого хранилища tiered storage. Реализована концепция многослойного хранения, то есть разделения персистентного слоя хранения информации на несколько последовательно расположенных слоев. Такое решение используется, чтобы расширить возможности подсистемы хранения данных Kafka за пределы локального хранения данных в кластере, обеспечивая вытеснение более старых данных во внешнее хранилище, такое как HDFS или S3, c минимальным влиянием на внутреннее устройство Kafka. При этом поведение Kafka и сложность сопровождения не должны были ухудшиться для имеющихся потребителей.

Бывают случаи, когда нужно хранить все события. Обычно для этого используется база данных, куда вы скидываете все события, в Kafka хранится только какой-то последний срез, а остальные нужно выгружать из внешнего хранилища. С появлением многоуровневого хранилища пользователь, которому нужны данные с определенным offset, получает их из локального кэша. Если этих данных нет, то он достает их из многоуровневого хранилища, то есть интерфейс не меняется.

Для того, чтобы это реализовать, были добавлены новые компоненты, например, RemoteLog Manager, который расширяет текущую функциональность существующего в Kafka LogManager, RemoteLogMetadata Manager и RemoteStorage Manager. Реализация HDFS и S3 планируется вне репозитория кода Kafka, то есть будет использоваться та же концепция, что и для Kafka Connectors.

Посмотрим верхнеуровнево на процесс записи и удаления данных в многоуровневых хранилищах.

У нас есть RemoteLog Manager Leader Task, который находит сегмент партиции для переноса в удаленное хранилище, затем он стартует перенос данных сегмента, а RemoteLogMetadata Manager запоминает и устанавливает статус переноса в Started. Затем RemoteLog Manager говорит, что нужно скопировать сегмент в удаленное хранилище, RemoteStorage Manager это делает. RemoteLog Manager уведомляет RemoteLogMetadata Manager, что перенос данных завершен, и последний устанавливает статус переноса Finished. После этого RemoteLog Manager заканчивает работу по сегменту, значит, что его можно удалять из локальных данных брокера, о чем уведомляется RemoteLog Manager Leader Task.

Процесс удаления локального сегмента выполняется асинхронно после копирования сегмента в удаленное хранилище. Некоторое время часть данных партиций топика находится одновременно в локальном и удаленном хранилищах. На самом деле, многоуровневое хранилище очень удобно использовать, потому что оно позволяет избавиться от каких-то дополнительных баз данных и использовать привычный интерфейс Kafka.

Выводы

  • Не пренебрегайте настройками Producer и Consumer. Это может вылезти в самый неподходящий момент.

  • Нужно обновляться на последние версии Confluent Kafka .NET, но делать это аккуратно, потому что иногда появляются мелкие баги, которые могут все сломать.

  • Ребалансировка - не корень зла, ее нужно правильно готовить. Это прекрасный инструмент для масштабирования.

  • Используйте debug, чтобы понять, что происходит по логам. Иногда это может сильно помочь и ускорить решение проблемы.

  • Иногда полезно следить за GitLab Confluent Kafka и KIP, потому что там часто публикуют интересные вещи.

  • Попробуйте многоуровневое хранилище для ознакомления. Оно уже в раннем доступе, это очень удобно.

Комментарии (1)