Всем привет! Сегодня расскажу про Confluent Kafka. Ее любят, ненавидят, но мало кто остается равнодушным.

О себе

Меня зовут Алена, я ведущий разработчик, занимаюсь развитием референсной архитектуры в компании Билайн. К основным областям профессиональных интересов можно отнести проблемы распределенных систем, event sourcing и DDD.

В двух словах о Kafka

Kafka - это распределенная платформа для обработки потоков данных, которая используется для построения высоконагруженных решений и обработки данных в реальном времени. По сути, это система, которая может очень быстро и эффективно передавать ваши события. Kafka может работать как на одной машине, так и на нескольких, образующих между собой кластер и повышающих общую эффективность системы. Событием в Kafka может быть любой тип данных, для Kafka это всего лишь последовательность байт. На картинке представлены сущности, которые участвуют в процессе работы с точки зрения архитектуры.

  • Producer генерирует и передает данные;

  • Consumer получает события, отправленные Producer;

  • События - информация, необходимая для выполнения какой-либо операции;

  • Broker - это сервер Kafka, который позволяет Producer’ам передавать потоки данных Consumer’ам;

  • Topic представляет из себя хранилище событий, сгруппированных по различным признакам. Consumer извлекает необходимую информацию из Topic.

Топик и партиции

Все события в Kafka относятся к тому или иному топику. Топик делится на партиции, количество которых указывается при создании топика. Если вы добавили партиции, то удалить их получится только со всем топиком. Добавлять же партиции в процессе работы можно. Для распределения событий по партициям используется ключ партиционирования. Если ключа нет, то события распределяются по алгоритму Round Robin.

Например, у нас есть топик с котиками, которых мы хотим распределять по цветам. В нулевой партиции рыжие котики, в первой - черные, во второй - серые, в третьей - белые. В качестве ключа партиционирования используется цвет. Поступает событие с ключом “серый”, котики с цветом серый находятся во второй партиции, значит, событие тоже попадет во вторую партицию. Затем приходит событие с ключом “трехцветный”, таких котиков еще нигде нет. Значит, это событие попадет в следующую партицию, и все дальнейшие события с этим ключом тоже будут попадать именно в эту партицию.

Стратегии распределения по партициям

Только что мы рассмотрели стратегию по умолчанию, где для распределения по партициям используется ключ. Если же ключа нет, то события распределяются по алгоритму Round Robin. Uniform sticky события отправляются в закрепленную партицию, чтобы уменьшить задержку. Также можно реализовать собственный алгоритм распределения по партициям.

Рассмотрим на примере, как это происходит. Используется библиотека Confluent Kafka .NET версии 2.4.0.

Для того, чтобы задать стратегию распределения по партициям для конкретного топика, необходимо использовать метод SetPartitioner. В него передается название топика, количество партиций и некоторые другие параметры. В качестве алгоритма используется остаток от деления на количество партиций. Далее, если вы хотите задать стратегию распределения для всех топиков с определенным Producer, необходимо использовать метод SetDefaultPartitioner.

Тут такие же параметры, кроме названия топика, и так же используется стратегия поиска остатка от деления на количество партиций. Таким образом, если вы используете метод SetPartitioner, то будет использоваться стратегия для конкретного топика оттуда. Если вы используете метод SetDefaultPartitioner, но не используете SetPartitioner, стратегия будет браться из первого. Если же вы не используете ни один из этих методов, то будет использована стратегия из настроек конфигурации. Если же и там она не задана, то распределение будет происходить по стратегии по умолчанию. 

Репликация данных

Мы рассмотрели, как записывать данные, теперь рассмотрим, как их реплицировать, чтобы не потерять. Каждая партиция может быть реплицирована N раз, где N - replication factor. Таким образом гарантируется наличие нескольких копий события на нескольких брокерах. 

В Kafka единицей репликации является партиция. У каждого топика есть одна или несколько партиций, каждая из которых имеет Leader и может иметь Follower. При создании топика задается количество партиций и replication factor. Последний часто равен трем, это значит, что у нас есть Leader и Follower. Kafka старается равномерно распределить Leader’ов партиций между брокерами для повышения надежности. Запись всегда происходит лидером, чтение тоже почти всегда производится лидером. Follower периодически делает запрос данных из Leader, чтобы получить последние события.

Когда отваливается один из брокеров, на нем с большой долей вероятности находятся лидеры партиций. Для всех этих партиций выбираются новые лидеры на оставшихся нодах. Например, у нас отпал третий брокер, то выбирается новый лидер второй партиции на втором брокере.

Далее отпадает первый брокер, на котором находился лидер первой партиции, он переключился на второго брокера. Теперь все лидеры партиций находятся именно там. Когда первый брокер восстанавливается, он добавляет четырех Follower, что обеспечивает некоторую избыточность каждой партиции, но лидеры по-прежнему остаются на втором брокере. Далее снова подключается к сети третий брокер, и мы возвращаемся к распределению три реплики на каждую партицию, но лидер все еще находится на втором брокере.

У Kafka есть концепция предпочтительных реплик лидеров. Когда Kafka создает партиции топика, она пытается равномерно распределить лидеров по брокерам и помечает первых лидеров как предпочтительных. Но временем из-за перезапуска серверов или различных инфраструктурных проблем эти лидеры находятся уже не на тех узлах, на которых они были изначально. Чтобы это исправить, можно задать параметр auto.leader.rebalance.enable = true, что позволит ОЗУ контроллера перенаправить лидеров обратно предпочтительным лидерам реплик, тем самым восстановив равномерное распределение.

Устаревание данных (retention)

Мы рассмотрели, как происходит репликация, чтобы понять, как не потерять данные, но наши данные в общем случае не хранятся в топике вечно. Партиция делится на сегменты, которые представлены в виде файлов log, расположены последовательно и обрабатываются в порядке очереди. Новые данные пишутся в активный сегмент, а старые данные будут удалены из последнего сегмента на основе правил retention по времени или занимаемому месту.

Например, восьмой файл - активный сегмент, если придет новое событие, то оно будет записано именно туда. Второй файл - это последний сегмент. Если сработает одно из правил retention, то данные будут удалены именно оттуда.

Настройки, связанные с retention:

  • retention.bytes - объем хранимых событий этого топика в байтах;

  • retention.ms - длительность хранения событий данного топика в миллисекундах;

  • параметры могут принимать значения -1, если они не ограничены;

  • для bytes по умолчанию установлено -1, для ms - 7 дней.

Оставлять значения по умолчанию очень опасно, потому что может произойти такая ситуация, при которой не сможете записать события. Например, мы оставили значения для bytes не ограничено и для ms 7 дней, дискового места доступно 100 гигабайт на каждую партицию. Происходит какая-то миграция, то есть события поступают очень быстро за короткий промежуток времени. Диски заполняются, поэтому когда приходит новое событие, мы получаем ошибку. Если у вас могут возникнуть такие ситуации, нужно заранее предусмотреть эти параметры, основываясь на пропускной способности вашего топика.

Про потребителей

Consumer’ы обычно находятся в consume group. Это значит, что если пользователи подписаны на один топик, то они получают события из различного подмножества партиций этого топика. Например, если в группе один пользователь, он будет получать события из всех партиций. Если несколько, то они будут получать события из различных подмножеств партиций. Если количество партиций и пользователей одинаковое, то каждый будет читать из своей партиции. При добавлении новых пользователей в случае, если их количество превышает число партиций, некоторые пользователи будут простаивать. Они будут задействованы только в том случае, если с текущим пользователем что-то случится, поэтому обычно это неэффективно. При подключении к consume group пользователи получают временный идентификатор, который служит для связи между пользователем и выданными ему партициями. При отключении от группы идентификатор не перестает работать.

Например, есть топик с шестью партициями, которые равномерно распределены между тремя пользователями. Второй пользователь по каким-то причинам покидает consume group, и его партиции перераспределяются между оставшимися пользователями. 

Процесс передачи партиции от одного пользователя к другому называется ребалансировкой. Ребалансировка важна, потому что она обеспечивает consume group масштабируемость и высокую доступность, позволяя легко и безопасно добавлять и удалять пользователей. Однако в обычных обстоятельствах она нежелательна.

Про стратегии распределения партиций

Range Assignor назначает партиции пользователям на основе диапазона идентификаторов. 

Сначала пользователи и партиции сортируются по их идентификаторам, а затем происходит назначение. Эта стратегия используется по умолчанию и гарантирует, что каждому пользователю будет назначен ряд смежных партиций.

Следующая стратегия - Round Robin Assignor.

Партиции назначаются пользователям по алгоритму Round Robin. Пользователи и партиции сортируются по их идентификаторам, затем каждая партиция назначается следующему доступному пользователю циклически. Звучит очень похоже на предыдущую стратегию, но Round Robin позволяет задействовать большее количество пользователей. Стратегия гарантирует, что каждый пользователь получит равное количество партиций, а в случае использования одной consume group для чтения из нескольких топиков может позволить более равномерно распределить нагрузку в consume group, задействовав большее количество пользователей.

Следующая стратегия - Sticky Assignor. Стратегия предназначена для того, чтобы минимизировать количество партиций, которое при ребалансировке перемещается между пользователями. Партиции назначаются пользователям на основе их идентификатора и хеша идентификатора партиции. Эта стратегия гарантирует, что каждому пользователю будут назначены те же партиции, если они все еще являются членами consume group. Если пользователь выходит из группы, то он теряет свои партиции и они перераспределяются между оставшимися членами группами. Разберем это на примере.

У Consumer A партиция 0 и партиция 3, у Consumer В - партиция 1, у Consumer С - партиция 2. По какой-то причине Consumer A покидает consume group, и происходит ребалансировка. Его партиции перераспределяются между Consumer В и Consumer С. Как видим, партиция 1 и партиция 2 остались у своих владельцев.

Стратегия Cooperative Sticky Assignor аналогична предыдущей за тем исключением, что тут поддерживается алгоритм совместной ребалансировки. Это значит, что пользователи могут продолжить потребление событий из тех партиций, которые были перераспределены. Sticky-распределение преследует две основные цели: 

  • максимально сбалансировать назначения;

  • в случае ребалансировки оставить как можно больше назначений на месте, минимизируя накладные расходы, связанные с перемещением и назначением партиций от одного пользователя к другому.

Подробнее про ребалансировки мы поговорим во второй части статьи.

Комментарии (0)