Привет, Хабр! Меня зовут Андрей Комягин, я CTO компании STM Labs. Мы занимаемся разработкой очень больших распределённых высоконагруженных систем для различных отраслей, включая налоговое администрирование, телеком, track & trace и многие другие.

В своей работе мы широко используем open-source-решения, в том числе Apache Kafka. Этот распределённый программный брокер сейчас применяется практически во всех наших проектах, и сегодня я предлагаю заглянуть внутрь чёрного ящика, чтобы понять, как там всё внутри устроено. В своей работе мы широко используем open-source-решения, в том числе Apache Kafka. Этот распределённый программный брокер сейчас применяется практически во всех наших проектах, и сегодня я предлагаю заглянуть внутрь чёрного ящика, чтобы понять, как там всё внутри устроено.

Этот текст написан на основе моего доклада на конференции SaintHighload++ 2025 в Санкт Петербурге. Так что, если вы были там в качестве слушателя, информация не будет для вас новой. Впрочем, повтор полезного материала никогда не бывает лишним. Поехали!

Немного истории

Брокер был создан в 2010 году в LinkedIn, написан на Scala + Java. C 2012 года находится в инкубаторе Apache. Название придумал один из создателей, Джей Крепс — он назвал брокер в честь своего любимого писателя Франца Кафки. Мир Франца Кафки неоднороден, соткан из различных фрагментов, зачастую стыкующихся друг с другом с большим трудом. Это философия максимально близко перекликается с ролью самого брокера, да и всей платформы Kafka в современных больших сложных информационных системах.

Apache Kafka — не просто очередной распределённый программный брокер. Это горизонтально масштабируемая платформа для потоковой обработки данных в реальном времени, с высокой пропускной способностью и низкой задержкой.

Кластер Kafka состоит из нескольких компонентов:

Брокеры (Brokers) — это серверы, которые хранят данные и обрабатывают запросы от клиентов. Брокеры обеспечивают отказоустойчивость и балансировку нагрузки в кластере.
Контроллеры (Controllers) — это специальные брокеры, которые отвечают за управление кластером и координацию работы других брокеров. Контроллер отслеживает состояние кластера, назначает лидеров для партиций и управляет репликами.
Топики (Topics) — это логи, в которых хранятся данные. Топик состоит из одного или нескольких разделов (партиций), которые распределены по брокерам.
Партиции (Partitions) — это отдельные разделы топика, которые хранятся на разных брокерах. Каждая партиция имеет лидера — брокер, который обрабатывает запросы на чтение и запись для этой партиции. Реплики — это копии партиций, которые хранятся на других брокерах для обеспечения отказоустойчивости.
Producer API — это интерфейс, позволяющий приложениям отправлять сообщения в топики Kafka.
Consumer API — это интерфейс, позволяющий приложениям читать сообщения из топиков Kafka.
Kafka Connect — это инструмент для интеграции Kafka с другими системами.

Структура хранения

Как я уже сказал ранее, в Kafka данные организованы в виде топиков, которые состоят из партиций. Каждая партиция представляет собой упорядоченный набор сообщений, которые хранятся в виде сегментов (segments). Каждый сегмент состоит из трёх типов файлов:

.log — это основной файл, который содержит сообщения в формате бинарных данных. Сообщения записываются последовательно, и каждый сегмент имеет определённый размер, после достижения которого создаётся новый сегмент.
.index — индексный файл, который содержит пары «offset — position». Offset — это смещение сообщения в сегменте, а position — позиция в файле .log, где начинается это сообщение.
.timeindex — временной индексный файл, который содержит пары «timestamp — offset». Timestamp — это временная метка сообщения, а offset — смещение сообщения в сегменте

Рассмотрим, как выглядит эта файловая структура на конкретном примере:

Важно отметить, что размер сегмента конфигурируется: log.segment.bytes = 1073741824 (1 GB).

Почему производительность работы Kafka существенно отличается от производительности работы других брокеров

Чтобы это понять, рассмотрим бенчмарк Sequential vs Random IO:

Он показывает IO Latency для последовательного ввода-вывода (sequential I/O) и произвольного ввода-вывода (random I/O). Мы видим, что последовательный ввод/вывод более чем в 20 раз быстрее при тестировании записи 1М значений в файл! Тест написан на Java с использованием FileWriter vs RandomAccessFile.

Разобравшись с sequential I/O, переходим к retention policy. Как вы уже догадались, для работы с данными в Kafka используется модель Append Only. То есть в контексте retention policy можно представить все сегменты партиции на оси времени. Самые актуальные данные у нас пишутся в активный сегмент, а сегменты, которые вышли за границы политики Retention (retention.ms=60480000 (7 дней по умолчанию)), удаляются. Важно отметить, что Retention-политика применяется с гранулярностью «Сегмент» —т.е. весь сегмент удаляется целиком.

А как же быть, если timestamp сообщения запланированы к отправке в будущем? Об этом тоже подумали (но, конечно, не сразу). KIP-937 определяет поведение брокера и правила валидации для таких случаев.

Следующий важный аспект в достижении предельной производительности — zero copy. В чем суть этой оптимизации? В традиционных системах передача данных между приложением и сетью включает в себя несколько этапов копирования: сначала из памяти приложения в буфер операционной системы, затем в буфер сетевого адаптера и, наконец, в сетевой интерфейс. Каждый этап копирования требует ресурсов процессора и времени.

Kafka использует системный вызов sendfile и передаёт данные напрямую из файлового кэша операционной системы в сетевой адаптер, минуя пользовательское пространство. Данные уже хранятся в кэше страниц операционной системы — а значит, исключаются лишние копирования, и данные передаются быстрее.

На графике мы видим, что скорость копирования файла размером 1ГБ при использовании sendfile на 91% выше скорости стандартного копирования без использования zero copy:

Producer

Процедура отправки сообщения в Producer состоит из следующих шагов:

Send message — вызов отправки сообщения.
Fetch metadata — получает метаданные. Этот процесс может занимать до 500мс. Обновляется каждые 5мин по умолчанию.
Serializer. Имеет три группы: Primitive Data Types (пример: StringSerializer), Complex Data Formats (avro, protobuf), Custom.
Partitioner — определяет, в какую партицию попадёт сообщение. Использует различные hash-функции.
Batch accumulator — копит пачку сообщений на основе параметров batch.size + linger.ms. Алгоритм компрессии работает именно для пачки сообщений, а не для одиночного сообщения. Алгоритмов несколько: gzip, snappy, lz4, zstd.

Sender – отправляет пачку сообщений в сеть

Следует упомянуть, что у Partitioner есть подводные камни. Микросервисы в современных системах часто разрабатываются на разных стеках технологий. Есть клиенты с нативной реализацией Producer API, и они по умолчанию используют Murmur2 partitioner, а есть клиенты, которые используют реализацию на базе библиотеки librdkafka, в которой до недавнего времени дефолтный partitioner использовал алгоритм CRC32. В результате сообщения по-разному распределяются по партициям, и это может влиять на работу бизнес-логики. Например, если в конкретную партицию должны попадать все тарификационные события по одному абоненту/пользователю в рамках сессии. Следует упомянуть, что у Partitioner есть подводные камни. Микросервисы в современных системах часто разрабатываются на разных стеках технологий. Есть клиенты с нативной реализацией Producer API, и они по умолчанию используют Murmur2 partitioner, а есть клиенты, которые используют реализацию на базе библиотеки librdkafka, в которой до недавнего времени дефолтный partitioner использовал алгоритм CRC32. В результате сообщения по-разному распределяются по партициям, и это может влиять на работу бизнес-логики. Например, если в конкретную партицию должны попадать все тарификационные события по одному абоненту/пользователю в рамках сессии.

Однако эту проблему разработчики уже решили: алгоритм Murmur2 теперь по дефолту используется везде.

Consumer

В экосистеме Kafka, в отличие от других брокеров сообщений, Consumer использует pull-модель чтения данных. Т.е. происходит постоянный поллинг топика.

Важно отметить, что данные всегда читаются с лидеров. Для подтверждения получения сообщения Consumer отправляет commit():

Как же на самом деле устроен алгоритм подтверждения доставки в Kafka? Мы знаем, что есть еще одна сущность, которая объединяет консьюмеров в группу — Consumer Group. У каждой группы есть свой Group Coordinator: у него много функций, но нам важна синхронизация работы процедуры commit. Для сохранения обработанных оффсетов используется системный топик consumeroffsets. В нем хранятся метаданные: groupld, topic, partition, offset.

Репликация

Итак, у нас есть Leader — это брокер, который отвечает за обработку запросов на чтение и запись для конкретной партиции. Он является главным узлом для этой партиции и координирует работу остальных реплик, которые называются follower.

ISR (In-Sync Replicas) — список реплик, которые находятся в синхронизации с лидером. Если реплика не успевает синхронизироваться с лидером в течение Replica.lag.time.max.ms, она исключается из списка ISR.
HWM (High Water Mark) — отметка, помогающая определить, какие данные уже доступны для чтения.
Replication.factor — количество копий каждой партиции, которые хранятся на разных брокерах.
Min.insync.replicas — минимальное количество реплик, которые должны быть в синхронизации с лидером.
Acks — параметр, который определяет, сколько подтверждений от реплик требуется для успешной записи сообщения. Возможные значения:

acks=0 — сообщение считается записанным сразу после его отправки клиентом, без ожидания подтверждений от брокеров;
acks=1 — сообщение считается записанным после получения подтверждения от лидера;
acks=all — сообщение считается записанным только после получения подтверждений от всех реплик в ISR.

Таким образом, можно вывести формулу гарантированной доставки и уровень надежности:

acks=all
replication.factor=N
min.insync.replicas=M
1 < M < N
N-M брокеров может быть недоступно

Прежде чем мы перейдем к самой интересной части статьи, давайте рассмотрим ещё один проблемный случай. Дисбаланс лидеров партиций в Apache Kafka возникает, когда лидеры партиций распределены между брокерами неравномерно. Это может привести к неравномерной нагрузке на брокеры и снижению производительности системы.

auto.leader.rebalance.enable=true — это параметр конфигурации, который позволяет автоматически перераспределять лидеров партиций между брокерами. Когда этот параметр включён, система периодически проверяет распределение лидеров и при необходимости выполняет ребалансировку.

Для принудительного выравнивания можно использовать скрипт kafka-preferred-replica-election.sh.

Батл: JSON vs Protobuf

Будучи архитектором, я обожаю ковыряться в различных технологиях, а для того, чтобы объективно что-то оценить и потом сделать осознанный выбор, нужен бенчмаркинг. Так что предлагаю провести батл двух популярных форматов сериализации JSON и Protobuf в контексте отправки данных в Kafka. А чтобы было совсем не скучно, выкрутим производительность на максимум. Давайте посмотрим, какие ручки нам доступны для тюнинга производительности Producer:

batch.size — размер пачки при пакетной передаче сообщений,
linger.ms — время ожидания накопления пачки сообщений,
• compression.type — тип компрессии,
max.in.flight.requests.per.connection — настройка pipelining,
acks — настройка подтверждений.

Немногие знают, что в Kafka реализован pipelining. Кстати, про pipelining в Redis я писал в прошлом году. В двух словах: это единый ответ на группу запросов, что в теории обеспечивает более высокую пропускную способность. Настраивается он параметром max.in.flight.requests.per.connection > 1. Звучит слишком сладко? Так и есть: у реализации pipelining есть ряд побочек, о которых вы должны знать:

• Нарушение очерёдности.
• Увеличение приводит к деградации из-за lock contention.

Поэтому в рамках своего бенчмаркинга я не трогал ручку pipelining.

А что по инструментам для бенчмаркинга? Тут всё отлично: инструмент идёт, что называется, «из коробки». kafka-producer-perf-test — это утилита, которая позволяет проводить нагрузочное тестирование производительности Kafka. У утилиты масса возможностей, но из основных:

• количество отправляемых сообщений,
• сам payload сообщения,
• настройки продюсера в виде properties-файла: сюда мы вынесли linger.ms, batch.size и compression.type,
• пропускная способность.

Теперь давайте посмотрим на наши тестовые конфигурации:

batch.size – 16K, 32K, 48K
linger.ms – 10, 20, 30
compression.type – none, snappy, gzip
serialization – json, protobuf

Я сгенерировал типовой payload для JSON и Protobuf. Фактически он представляет собой массив объектов. Каждый объект — это две пары «ключ-значение» двух разных типов — string и integer. В тестах будем варьировать количество объектов в массиве: 10/50/100/200:

Результаты. Первый тест — без использования компрессии, на 100 атрибутах в массиве, на дефолтных значениях linger.ms и batch.size. Мы видим колоссальную разницу в rps: 4500 для JSON против 99000 для Protobuf:

Теперь на одном графике посмотрим результаты тестов для всех вариаций сочетаний параметров. Мы явно видим, что производительность начинает заметно падать, если при использовании JSON-сериализации в массиве собирается более 50 объектов. При этом мы не фиксируем никаких признаков деградации при использовании бинарной сериализации Protobuf вне зависимости от параметров конфигурации батчинга и типа компрессии.

Теперь отберём наилучшие результаты. Самой оптимальной для выбранного payload оказалась конфигурация, где размер буфера для батчинга равен 48К, время ожидания его заполнения — 10мс, и использовался быстрый неагрессивный алгоритм компрессии — snappy:

Были проведены и другие замеры, не связанные с самим брокером. Для 100 объектов в массиве замерялась скорость сериализации и десериализации, а также компактность. Результаты меня удивили:

• Protobuf в среднем компактнее в 2.5 раза,
• скорость сериализация Protobuf выше в 2 раза,
• скорость десериализации Protobuf выше в 6 раз!

А что ещё можно крутить, чтобы выжать максимум? Сетевой стек! Как на уровне настроек брокера и продюсера, так и на уровне тюнинга TCP-буферов в ОС. Но это тема для отдельного большого поста.

Георепликация

Напоследок предлагаю устроить батл Stretched vs Connected cluster или, по-русски, «Растянутый геокластер против Соединённого геокластера».

Что такое растянутый кластер? Это фактически просто единый кластер Kafka, который растянут на несколько ЦОД.

Оценим преимущества и недостатки такой топологии.

Преимущества:

• Единый слой данных: все брокеры в кластере имеют доступ к одним и тем же топикам и сообщениям.
• Простота управления: единый кластер проще администрировать и мониторить.
• Синхронизация данных в реальном времени: данные синхронизируются между узлами кластера без задержек.

Недостатки:

• Высокая нагрузка на сеть: передача данных между ЦОД создаёт высокую нагрузку на сеть.
• Зависимость от latency: требуются КС с задержками менее 50 мс.
• Rack-awareness — KIP-36!!! То есть фактически rack(стойка) — конкретная зона доступности.
• В идеале нужен 3-й ЦОД для кворума.

Наверняка многие уже догадались, что собой представляет соединённый кластер. Всё верно — это просто несколько кластеров, которые каким-то образом соединены друг с другом. Решений, которые позволяют организовать такую топологию, на рынке достаточно много. В этой статье мы не будем заниматься их обзором и сравнением. Одним из самых популярных и нативных решений, которое идет от Apache, является инструмент Mirror Maker 2. На следующей диаграмме я показал, как можно организовать топологию в двух ЦОД в режиме «Active-Active» с помощью Mirror Maker 2:

Давайте оценим преимущества и недостатки такой топологии.

Преимущества:

• Изоляция кластеров: минимальное влияние проблем одного кластера на работу другого.
• Гибкость: можно применять различные политики репликации для разных топиков.
• Масштабируемость: легко добавлять новые кластеры и масштабировать систему по мере необходимости.
• Соединённый кластер менее требователен к каналам связи между ЦОД.

Недостатки:
• Сложность эксплуатации: настраивать несколько кластеров и управлять ими сложнее, чем делать то же самое с одним растянутым кластером.
• Задержка репликации: данные могут отставать.
• Дополнительные ресурсы: для управления несколькими кластерами требуется больше ресурсов.

Итак, в этой статье мы с вами:

• заглянули в «чёрный ящик» и глубоко изучили архитектуру Kafka,
• разобрали по косточкам всю анатомию высокой производительности Kafka,
• подробно разобрали устройство Producer и Consumer,
• изучили все нюансы работы репликации,
• не прошли мимо тюнинга и бенчмаркинга,
• на сладкое разобрали топологии геокластеров.

Я надеюсь, что этот материал был вам полезен, и теперь вы лучше понимаете, что такое Kafka и с чем его едят. В следующей статье мы рассмотрим программный брокер RabbitMQ, сравним его с Kafka и сделаем выводы. Подписывайтесь на блог, оставляйте комментарии — я всегда рад обратной связи и возможности подискутировать!

Комментарии (0)