В предыдущей статье мы с вами разобрали, как развернуть кластер Kafka из трех брокеров в режиме KRaft на Windows в WSL, и передали простое сообщение от консольного продюсера к консольному консьюмеру.
Продолжая следовать правилу «практика — лучший способ глубокого понимания теории», перейдем к следующей части.
В целом о работе Kafka написано достаточно много. Поэтому предлагаю остановиться на одном из ключевых аспектов с точки зрения интегрируемых систем — структуре и обработке сообщений Kafka:
более подробно разберем структуру сообщения в Kafka с точки зрения продюсера;
сформулируем ключевые требования к проектированию шины данных на основе Apache Kafka;
разработаем собственные продюсер и консьюмер на Python;
на практике увидим, что такое JSON‑сериализация и десериализация;
Формирование и отправка сообщений в Apache Kafka — это многоэтапный процесс, в котором участвуют продюсер (producer), брокеры Kafka и внутренние механизмы сериализации, буферизации и сетевой передачи.
Сообщение, формируемое продюсером, состоит из полезной нагрузки (payload) и сопутствующих метаданных, необходимых для его обработки и маршрутизации. По умолчанию максимальный размер одного сообщения в Kafka ограничен 1 МБ, однако этот лимит можно настроить. Кроме того, продюсер может отправлять сообщения по одному или группировать их в пакеты (batch) — это зависит от конфигурации производительности и задержек, выбранной в настройках продюсера.
Последовательность формирования запроса продюсером в адрес брокера выглядит следующим образом:
Создание сообщения (Record)
Сериализация ключа и значения (преобразование в массив байтов)
Определение партиции, в которую будет помещено сообщение.
Формирование Record Batch (группировка сообщений по партициям)
Формирование запроса ProduceRequest и отправка брокеру
Понимание этапов формирования запроса поможет не только разобраться в его внутренней структуре, но и глубже осмыслить назначение ключевых параметров продюсера. На первый взгляд, данные от одного датчика — это всего лишь несколько байт. Однако в распределённой системе вроде Kafka, где миллионы таких сообщений проходят через продюсеры, наполняются метаданными, накапливаются в буферах, группируются в батчи и сжимаются, — из этих «крошек» формируется значительный объём сетевого трафика.
Итак, давайте рассмотрим каждый этап по порядку.
Создание сообщения (Record)
На первом этапе, перед сериализацией, приложение создаёт Record — логическое сообщение, которое включает в себя следующие компоненты:

Key (ключ)
Kafka использует ключ для определения партиции, в которую будет отправлено сообщение. Это произвольный объект, который будет сериализован в байты. Например: String, Integer, Dictionary, None
Гарантируется, что все сообщения с одинаковым ключом попадут в одну и ту же партицию топика. Это обеспечивает порядок сообщений для данного ключа.
Если ключ не задан (null), сообщения распределяются по партициям равномерно — обычно с помощью циклического выбора (round‑robin) или случайного распределения.
Ключ удобно использовать для семантической группировки сообщений, например, по идентификатору пользователя, заказа или агрегата в CQRS/Event Sourcing.
Value (значение)
Основная полезная нагрузка (payload) сообщения — именно те данные, которые вы хотите передать.
Сериализация прикладных данных в байты выполняется продюсером, десериализация — консьюмером. Формат (JSON, Avro, Protobuf и др.) определяется на стороне продюсера.
Для проверки целостности и валидации брокер распаковывает сжатые батчи (например, LZ4, ZSTD), но при этом не интерпретирует содержимое Value — оно остаётся «чёрным ящиком».
Headers (заголовки)
Опциональные метаданные сообщения, добавленные в Kafka начиная с версии 0.11.
Представляют собой список пар «ключ‑значение», где значение может быть строкой, числом или null.
Не влияют на маршрутизацию или партиционирование.
-
Удобны для передачи служебной информации:
идентификаторы трассировки (например, для distributed tracing),
токены аутентификации,
версии схемы,
флаги обработки.
Topic (топик)
Не входит в состав сериализованного сообщения — не передаётся в RecordBatch.
-
Используется только на стороне продюсера для определения партиции и формирования ProduceRequest, где топик указывается на уровне запроса.
Сериализация сообщения
Поскольку Kafka передаёт данные по сети в бинарном виде, объекты ключа (key) и значения (value) должны быть преобразованы в массивы байтов (byte[]). Эту задачу выполняют сериализаторы, такие как StringSerializer, ByteArraySerializer, а также форматно‑ориентированные — Avro, Protobuf, JSON схемы и другие.
Timestamp и Headers — также сериализуются внутренним форматом Kafka и передаются, но не так, как key/value. Они — часть формата сообщения Kafka, а не полезной нагрузки.
Топик — единственное из перечисленных полей, которое не попадает в RecordBatch и не сериализуется в сообщение вообще.
Определение партиции.
На этом этапе продюсер выполняет следующие шаги.
-
Решает в какую партицию топика отправить сообщение:
Если ключ (key) указан, используется хеш от ключа для распределения сообщения в партицию, где расположены сообщения с таким же ключом.
Если ключ (key) = null, используется стратегия равномерного распределения (round‑robin) или кастомная.
-
Проверяет актуальность текущих метаданных— информация о топиках, партициях, брокерах, лидерах и репликах. Проверка актуальности происходит перед отправкой каждого RecordBatch.
Формирование RecordBatch
Упрощенная структура RecordBatch, формируемая продюсером, будет примерно следующей:
RecordBatch
Сообщения обычно не отправляются по одному — они буферизуются в памяти продюсера и группируются в Record Batches (пакеты записей). Это повышает производительность и снижает накладные расходы.
Каждый Record Batch относится к одному топику и одной партиции.
Батчи формируются в Record Accumulator — внутреннем буфере продюсера.
Размер батча ограничен параметром
batch.size
(по умолчанию 16 КБ).Если батч не заполнен до конца, но прошло время linger.ms (по умолчанию 0 мс), он всё равно отправляется.
Только на этом этапе применяется сжатие — весь батч сжимается целиком, что эффективнее, чем сжимать каждое сообщение отдельно.
Шаги формирования RecordBatch.
Продюсер получает новое сообщение.
Определяет партицию (по ключу или round‑robin).
-
Смотрит, есть ли открытый батч для этой партиции:
Если есть и он ещё не полный (<
batch.size
), то добавляет сообщение.Если батч полный или закрыт, то создаёт новый.
-
После добавления первого сообщения запускается таймер
linger.ms
(по умолчанию — 0 мс):Если за это время пришли новые записи, они добавятся.
Если не пришли, батч отправляется «как есть».
Применяет выбранный алгоритм сжатия: producer, gzip, snappy, lz4, zstd или none.
Указывает тип сжатия в поле
RecordBatch.Attributes
.
Параметры продюсера, отвечающие за формирование RecordBatch.
Параметр |
Назначение |
Пример значений |
|
Размер батча в байтах, при котором он отправляется |
16384 (16 КБ) или больше — чем больше батч, тем эффективнее сжатие |
|
Время ожидания перед отправкой батча (чтобы набрать больше сообщений) |
5–20 мс — помогает увеличить размер батча, по умолчанию 0 мс. |
|
Тип сжатия данных RecordBatch |
none | gzip | snappy | lz4 | zstd | producer |
Описание значений compression.type:
none — сжатие отключено (по умолчанию).
gzip — высокая степень сжатия, но высокая нагрузка на CPU.
snappy — умеренное сжатие, низкое потребление CPU (часто используется).
lz4 — быстрее snappy, хорошее сжатие (рекомендуется по умолчанию).
zstd — отличное сжатие, поддерживается с Kafka 2.1+, баланс между эффективностью и скоростью.
producer — продюсер выбирает тип, брокер сохраняет как есть (рекомендуется с Kafka 2.4+).
Формирование ProduceRequest и отправка брокеру
Когда батч считается готовым (заполнен batch.size или истек linger.ms
), продюсер формирует ProduceRequest — сетевой запрос к брокеру Kafka.
ProduceRequest содержит один или несколько Record Batch, направленных разным партициям (но обычно группируются по брокерам).
Запрос отправляется через сеть с использованием протокола Kafka (на базе TCP).
-
Продюсер ожидает ответа от брокера в зависимости от настройки acks:
acks=0 — не ждать подтверждения;
acks=1 — подтверждение от лидера партициона;
acks=all — подтверждение от всех реплик (максимальная надёжность).
Логическая структура запроса ProduceRequest, который продюсер отправляет брокеру Kafka для записи данных:

Взаимодействие продюсеров и консьюмеров с брокером Kafka
Процесс обмена данными в Kafka строится на основе строго определенных запросов. После формирования ProduceRequest продюсер отправляет его брокеру. Брокер, в свою очередь, записывает содержащийся в запросе RecordBatch в лог‑файл соответствующего топика. Данные хранятся в партиции до истечения срока хранения, заданного политикой retention (по времени или по достижению максимального размера), и всё это время остаются доступными для чтения.
Для получения данных консьюмеры отправляют брокеру FetchRequest. В ответ брокер возвращает FetchResponse, который содержит один или несколько сжатых RecordBatch. Важно отметить, что интенсивность этих запросов и объем запрашиваемых данных определяются настройками консьюмера (такими как fetch.min.bytes
и fetch.max.wait.ms
) и его внутренней логикой работы.
Ключевой механизм оптимизации:
Фундаментальным механизмом, лежащим в основе эффективности Apache Kafka, является пакетная обработка (batching) и сжатие (compression) сообщений. Вместо обработки каждого сообщения (Record) по отдельности Kafka объединяет их в группы — RecordBatch. Такой подход обеспечивает ряд критически важных преимуществ:
Значительное повышение пропускной способности (throughput) системы.
Снижение нагрузки на сеть за счет передачи larger, сжатых блоков данных вместо множества мелких сообщений.
Эффективное использование дискового пространства и снижение нагрузки на I/O, так как на диск записываются уже сжатые батчи.
Разберем процесс по шагам:
-
Формирование батча на продюсере (Producer):
Продюсер не отправляет каждое сообщение мгновенно. Он накапливает их в памяти, формируя RecordBatch.
Перед отправкой весь батч целиком сжимается с использованием выбранного кодека (например, gzip, snappy, lz4 или zstd).
-
Хранение в сжатом виде на брокере (Broker):
Брокер получает уже сжатый RecordBatch и записывает его на диск в том же виде, без распаковки. Это экономит не только пропускную способность сети, но и дисковое пространство (I/O).
Данные хранятся в логах (log segments) именно как сжатые батчи. Метка compressionType в заголовке батча сообщает, какой алгоритм был использован.
-
Передача консюмеру (Consumer) и распаковка:
Когда консюмер отправляет запрос (FetchRequest) на чтение данных, брокер возвращает ему те же самые сжатые RecordBatch.
Распаковка происходит на стороне консюмера его клиентской библиотекой. Это распределяет нагрузку по распаковке между всеми консюмерами в системе, а не нагружает брокеры.
-
Обработка отдельных сообщений:
Только после распаковки батча консюмер получает доступ к индивидуальным сообщениям (Records) и их полям: ключу (Key), значению (Value), заголовкам (Headers) и временной метке (Timestamp).

PlantUML
@startuml
skinparam sequenceMessageAlign center
skinparam sequenceArrowThickness 2
skinparam sequenceLifeLineBorderColor #888
skinparam sequenceLifeLineBackgroundColor #EEE
actor Producer
participant "Broker\n(Partition Log)" as Broker
actor Consumer
Producer -> Broker : Отправка сообщений\n(RecordBatch c compressionType)
note right of Producer
Producer собирает несколько Record
в RecordBatch и применяет сжатие
(gzip/snappy/lz4/zstd)
end note
Broker -> Broker : Хранение RecordBatch\n(сжатый вид в log segment)
note right of Broker
В лог записан целый батч
с указанием compressionType
end note
Consumer -> Broker : FetchRequest (запросить партицию)
Broker --> Consumer : RecordBatch (сжатый)
note right of Consumer
Клиентская библиотека консьюмера
распаковывает батч по compressionType
end note
Consumer -> Consumer : Обработка Records\n(Key, Value, Headers, Timestamp)
@enduml
Проектирование топиков и потоков данных
Перед тем как приступить к созданию топиков, продюсеров и консьюмеров, предлагаю подумать над нашими к ним требованиями. Для этого ответим на ключевые вопросы, которые определят их конфигурацию и поведение. Это позволит избежать ошибок на архитектурном уровне и обеспечить отказоустойчивость, производительность и согласованность данных.
Для этого определите:
-
Набор топиков.
принцип единственной ответственности — каждый топик должен быть посвящен одной конкретной бизнес‑сущности или событию.
избегайте сквозных топиков — не создавайте один топик all‑events для всех типов событий, кроме как для тестирования.
семантика именования — имя топика должно быть понятно не только вам, но и другим разработчикам, аналитикам и архитекторам. Используйте соглашение, например,
<домен>.<сущность>.<версия>
или<команда>.<событие>
.
-
Продюсеры и консюмеры для каждого топика.
какой сервис или приложение создает эти события/данные?
какие сервисы подписываются на эти данные и что они с ними делают?
-
Необходимость использования ключей (key) для обеспечения порядковой гарантии.
помните: Kafka использует ключ для определения партиции. Сообщения с одинаковым ключом всегда попадают в одну партицию.
если порядок сообщений в рамках некоторой сущности (например, пользователя или заказа) критичен — используйте её ID в качестве ключа.
если порядок не важен — можно оставить ключ пустым (null) для равномерного распределения нагрузки.
-
Требования к задержке (latency) отправки сообщений
linger.ms
— сколько миллисекунд продюсер будет ждать, чтобы собрать больше сообщений в батч.batch.size
— максимальный размер батча в байтах. При достижении этого размера батч отправится немедленно.
-
Уровень надёжности передачи данных:
Для критически важных данных: Используйте replication.factor=3 и acks=all. Это гарантирует, что сообщение будет сохранено на нескольких брокерах прежде, чем продюсер получит подтверждение.
Для данных средней важности: Используйте replication.factor=3 и acks=1. Это защитит от сбоя брокера, но не гарантирует, что все реплики получили данные к моменту подтверждения.
Для неважных данных (логи): Можно использовать replication.factor=2 и acks=1 или даже acks=0 для максимальной скорости.
-
Формат (схему) данных для каждого топика (поле value).
Формат — будем ли мы использовать JSON, Avro, Protobuf или простой текст?
Тип данных — будут это сырые события, снимки состояния сущности или команды?
Структура — используйте единый шаблон для всех сообщений в топике.
-
Политику хранения данных для каждого топика.
Данные в Kafka не хранятся вечно. Вы должны явно задать правила их удаления, которые зависят от назначения данных:retention.ms — сколько времени сообщения должны храниться (например, 7 дней, 90 дней).
retention.bytes — максимальный объем данных может находиться в топике (например, 1 ТБ).
Подробней о топиках
После того как мы изучили структуру сообщений, запроектировали параметры нашей системы, перейдём к созданию топиков.
Ранее мы создали базовый топик с обязательными параметрами. Все остальные параметры имеют значения по умолчанию, которые можно настроить двумя способами:
Через динамические свойства топика (с помощью команды kafka‑configs)
Через конфигурационные файлы брокера (
server.properties
), но только для глобальных настроек
Важно понимать:
Динамические настройки топика сохраняются между перезапусками
Глобальные настройки в
server.properties
влияют на все топикиНекоторые параметры можно задать только при создании топика
Далее рассмотрим основные параметры топиков:
Параметр |
Назначение |
Пример значений |
|
Адрес брокера Kafka для подключения |
kafka1:9092,kafka2:9092 (для кластера) |
(Название топика) |
Уникальное имя топика для идентификации в кластере. Желательно указать назначение в комментариях. |
— |
(Количество партиций) |
Количество разделов для параллельной обработки. |
Зависит от нагрузки, соответствует расчетному количеству потребителей. По умолчанию 1. |
(Фактор репликации) |
Количество копий (реплик) каждой партиции для отказоустойчивости. |
По умолчанию = 1. Обычно 2 или 3. |
(Минимум in-sync реплик) |
Минимальное количество реплик, которые должны быть синхронизированы для успешной записи данныхдля acks=all. |
2 (при replication.factor=3). По умолчанию 1. |
(Политика очистки) |
Управление хранением данных: delete (удаление) или compact (компактизация). |
delete (логи), compact (ключевые данные). По умолчанию delete. |
(Тип сжатия) |
Алгоритм сжатия сообщений: none, gzip, snappy, lz4, zstd. Влияет на производительность. |
producer (использует настройки продюсера), zstd (лучшее сжатие). |
Полный список параметров:
Bash
/opt/kafka/bin/kafka-topics.sh --help
Далее нам необходимо проверить наличие топика с названием test-topic.
Bash
list — команда для вывода списка всех существующих топиков
bootstrap-server — адрес сервера Kafka для подключения
Если топика нет, создаем
Bash
/opt/kafka/bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--topic test-topic \
--partitions 1 \
--replication-factor 3 \
create — указывает на операцию создания нового топика
bootstrap‑server — адрес и порт сервера Kafka для подключения (например, localhost:9092)
topic — имя создаваемого топика (в примере: test‑topic)
partitions — количество партиций в топике (1 в данном случае)
replication‑factor — фактор репликации (количество копий данных, 3 в примере)
Допустим мы решили, что все сообщения, которые поступили брокеру, должны обязательно соответствовать алгоритму сжатия продюсера. И решили назначить этот через динамические свойства.
Bash
/opt/kafka/bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--alter \
--entity-type topics \
--entity-name test-topic \
--add-config compression.type=producer
bootstrap‑server — адрес и порт сервера Kafka для подключения (например, localhost:9092)
alter — флаг, указывающий, что вы хотите изменить конфигурацию сущности
entity‑type — определяет тип сущности, конфигурацию которой вы изменяете, в нашем случае topics
entity-name - указывает имя сущности, конфигурацию которой вы изменяете. В нашем случае test-topics.
-
add‑config compression.type=producer — добавляет или изменяет параметр конфигурации топика
Результат выполнения команды:
Completed updating config for topic test-topic.
Чтобы убедиться, что параметр установлен корректно, можно выполнить:
Bash
/opt/kafka/bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--describe \
--entity-type topics \
--entity-name test-topic
bootstrap‑server — адрес и порт сервера Kafka для подключения (например, localhost:9092)
describe - выводит текущую конфигурацию указанной сущности (в данном случае — топика).
entity‑type — определяет тип сущности, конфигурацию которой вы изменяете. В нашем случае topics.
-
entity-name - указывает имя сущности, конфигурацию которой вы изменяете. В нашем случае test-topics.
Результат проверки параметра:
Dynamic configs for topic test-topic are:
compression.type=producer sensitive=false
synonyms={DYNAMIC_TOPIC_CONFIG:compression.type=producer,
DEFAULT_CONFIG:compression.type=producer}
Для удаления записи конфигурации выполним команду:
Bash
/opt/kafka/bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--alter \
--entity-type topics \
--entity-name test-topic \
--delete-config compression.type
bootstrap‑server — адрес и порт сервера Kafka для подключения (например, localhost:9092)
alter — флаг, указывающий, что вы хотите изменить конфигурацию сущности
entity‑type — определяет тип сущности, конфигурацию которой вы изменяете, в нашем случае topics
entity-name - указывает имя сущности, конфигурацию которой вы изменяете. В нашем случае test-topics.
-
delete-config - удаляет указанный динамический параметр.
Результат выполнения команды:
Completed updating config for topic test-topic.
Производитель (Producer)
Далее перейдём к созданию продюсера.
Хотя у Kafka есть встроенные инструменты, их функциональность довольно ограничена. В такой ситуации лучше всего использовать Java или Python. Мы выберем Python — он проще в освоении и быстрее в разработке. Сначала определимся с библиотекой.
Выбор библиотеки
Среди наиболее популярных решений для работы с Kafka в Python — confluent_kafka
и kafka‑python
. Мы остановимся на confluent_kafka, и вот почему:
Эта библиотека построена на основе librdkafka — высокопроизводительной реализации клиента Kafka, написанной на C++. Благодаря этому она обеспечивает высокую скорость, надёжность и масштабируемость. Кроме того, confluent_kafka поддерживает все современные функции Kafka, включая Exactly‑Once Semantics, расширенную маршрутизацию сообщений и интеграцию с Confluent Cloud.
Для сериализации данных в формат JSON мы будем использовать стандартный модуль Python - json.
from confluent_kafka import Producer
import json
import socket
Конфигурация продюсера
Следующий шаг — конфигурация продюсера. Конфигурация определяет поведение продюсера в Kafka: от способа отправки сообщений до обработки ошибок и взаимодействия с кластером. Правильно подобранные параметры обеспечивают надёжность, производительность и соответствие архитектурным требованиям вашей системы.
conf = {
'bootstrap.servers': '<ваш_IP>:9092',
'client.id': socket.gethostname(),
'acks': 'all',
'compression.type': 'none',
'retries': 5
}
producer = Producer(conf)
<ваш_IP>
— заменяем на IP адрес, указанный в advertised.listeners, конфигурации кластера server.properties.
Параметр 'client.id': socket.gethostname()
в конфигурации продюсера Kafka выполняет важную роль идентификации и мониторинга:
client.id
— это уникальный идентификатор, который продюсер (или консьюмер) передаёт брокеру Kafka при подключении.socket.gethostname()
— это стандартная Python‑функция, которая возвращает имя хоста (компьютера), на котором запущен ваш скрипт.
Далее, producer = Producer(conf)
— создаем экземпляр продюсера, передаем ему конфигурацию
Остальные параметры — это параметры продюсера Kafka. Надеюсь, они вам уже знакомы.
Основные параметры продюсера.
Параметр |
Назначение |
Пример значений |
|
Адреса брокеров Kafka для подключения |
broker1:9092, broker2:9094 |
(Сериализатор ключа) |
Сериализатор для ключа сообщения |
StringSerializer - преобразует строки (String) в байты (byte[]) |
(Сериализатор значения) |
Сериализатор для значения сообщения |
StringSerializer - преобразует строки (String) в байты (byte[]) |
(Подтверждение записи) |
Уровень подтверждения записи (0, 1, all). Определяет надежность доставки. |
all (для гарантированной доставки) |
(Протокол безопасности) |
Протокол безопасности (PLAINTEXT, SSL, SASL_SSL). |
SASL_SSL - для продакшена PLAINTEXT - без шифрования и аутентификации |
(Повторные попытки) |
Количество попыток повтора при ошибках. |
5-10 (избегайте бесконечных повторов) |
(Таймаут запроса) |
Таймаут ожидания ответа от брокера (мс). |
30000 (30 сек) |
(Тип сжатия) |
Алгоритм сжатия (none, gzip, snappy, lz4, zstd). |
snappy или lz4 (баланс скорости/сжатия) |
(Макс. размер запроса) |
Максимальный размер одного запроса (байты). |
1 МБ (1048576 байт) по умолчанию |
(Буфер памяти) |
Объем памяти для буферизации неотправленных сообщений (байты). |
32 МБ (33,554,432 байта) по умолчанию |
Сериализация данных
В качестве метода сериализации value выберем JSON.
Сначала преобразуем словарь в JSON-строку с помощью json.dumps(), затем кодируем строку в байты методом encode('utf-8').
Кодировку указываем 'utf-8', т.к. она будет понятна среде Linux.
message_value = json.dumps(value).encode('utf-8')
Добавим параметры заголовка
Создадим массив из пар (ключ-значение):
headers = [
("source", "python-producer"),
("version", "1.0"),
("content-type", "text/plain")
]
Обработка статуса доставки
Функция delivery_report принимает два параметра:
err — объект ошибки, который будет None, если сообщение успешно доставлено, или содержит информацию об ошибке, если доставка не удалась.
msg — объект сообщения, которое было отправлено. Если доставка прошла успешно, этот объект содержит метаданные о сообщении: тему, партицию и offset.
def delivery_report(err, msg):
if err is not None:
print(f'Ошибка доставки: {err}')
else:
print(f'Доставлено в {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
Отправка сообщения
Несмотря на то, что формирование ProduceRequest — достаточно сложный процесс, отправка сообщения с помощью продюсера на Python ничуть не сложнее, чем через kafka-console-producer.sh
.
producer.produce(
topic=topic, # Укажем топик
key=key, # Укажем ключ
value=serialized_value, # Добавляем явно преобразованное значение
headers=headers, # Добавляем заголовки
callback=delivery_report
)
Завершение работы
producer.flush()
Что делает flush()
Ждёт, пока все сообщения из внутреннего буфера продюсера будут обработаны.
Вызывает колбэки delivery_report для каждого сообщения.
Возвращает число неудалённых сообщений.
Synchronous and Asynchronous Sending
Далее мы создадим два варианта продюсера: один — для асинхронной отправки сообщений, другой — для синхронной. Выбор зависит от задач, которые вы решаете.
Какой выбрать, зависит от решаемой вами задачи.
Asynchronous:
Быстрая отправка, подходит в большинстве случаев. Например, для логгирования, метрик, high‑throughput систем.
Пример продюсера, работающего в асинхронном режиме:
Asynchronous
from confluent_kafka import Producer
import json
import socket
# Конфигурация продюсера
conf = {
'bootstrap.servers': '<ваш_IP>:9092', # Используем advertised.listeners из конфига
'client.id': socket.gethostname(), # Присваиваем продюсеру имя вашего хоста
'acks': 'all', # Ждем подтверждения от всех реплик
'compression.type': 'none', # Можно изменить на 'gzip', 'snappy' и т.д.
'retries': 5 # Число попыток при ошибка
}
# Создаем экземпляр продюсера, передаем ему конфигурацию
producer = Producer(conf)
# Callback-функция для обработки статуса доставки сообщений
def delivery_report(err, msg):
if err is not None:
# Ошибка доставки (таймаут, недоступность брокера, etc.)
print(f'Ошибка доставки: {err}')
else:
# Сообщение успешно доставлено и подтверждено брокером
print(f'Доставлено в {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
# Функция асинхронной отправки сообщения
def produce_async(topic, headers, key, value):
# JSON сериализация данных value
serialized_value = json.dumps(value).encode('utf-8')
# Отправка сообщения
producer.produce(
topic=topic, # Укажем топик
key=key, # Укажем ключ
value=serialized_value, # Добавляем явно преобразованное значение
headers=headers, # Добавляем заголовки
callback=delivery_report
)
# Подготавливаем сообщение
message_topic = 'test-topic'
message_headers = [
("source", "python-producer"),
("version", "1.0"),
("content-type", "text/plain")
]
message_key = 'synch_123'
message_value = {'data_1': 123, 'data_2': 'ОК'}
# Отправляем сообщения
# Сообщение 1
produce_async(message_topic, message_headers, message_key, message_value)
# Сообщение 2
produce_async(message_topic, message_headers, message_key, message_value)
# Завершаем работу: ждём, пока сообщения из буфера будут отправлены и обработаны колбэками.
# Затем закрываем соединения и освобождаем ресурсы
producer.flush()
Synchronous:
Подходит, например, для финансовых транзакций, где необходима строгая очередность операций.
Чтобы обеспечить синхронность доставки, добавим в функцию отправки сообщений вызов метода flush() с таймаутом 5 секунд:
def produce_async(topic, headers, key, value):
…
producer.flush(timeout=5.0)
Этот метод блокирует выполнение до тех пор, пока все сообщения из очереди не будут отправлены или пока не истечёт указанное время ожидания. Если по истечении 5 секунд очередь не успела полностью очиститься, метод вернёт ненулевое значение — количество неотправленных сообщений.
Такой подход позволяет контролировать состояние доставки: при обнаружении «застрявших» сообщений можно, например, инициировать повторную отправку, записать ошибку в лог или прервать операцию, чтобы избежать нарушения целостности данных.
Пример продюсера, работающего в синхронном режиме:
Synchronous
from confluent_kafka import Producer
import json
import socket
# Конфигурация продюсера
conf = {
'bootstrap.servers': '<ваш_IP>:9092', # Используем advertised.listeners из конфига
'client.id': socket.gethostname(), # Присваиваем продюсеру имя вашего хоста
'acks': 'all', # Ждем подтверждения от всех реплик
'compression.type': 'none', # Можно изменить на 'gzip', 'snappy' и т.д.
'retries': 5 # Число попыток при ошибка
}
# Создаем экземпляр продюсера, передаем ему конфигурацию
producer = Producer(conf)
# Callback-функция для обработки статуса доставки сообщений
def delivery_report(err, msg):
if err is not None:
# Ошибка доставки (таймаут, недоступность брокера, etc.)
print(f'Ошибка доставки: {err}')
else:
# Сообщение успешно доставлено и подтверждено брокером
print(f'Доставлено в {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
# Функция синхронной отправки сообщения
def produce_sync(topic, headers, key, value):
# Сериализация данных
serialized_value = json.dumps(value).encode('utf-8')
# Отправка сообщения
producer.produce(
topic=topic, # Укажем топик
key=key, # Укажем ключ
value=serialized_value, # Добавляем явно преобразованное значение
headers=headers, # Добавляем заголовки
callback=delivery_report
)
# Блокируем очередь. Ожидание подтверждения доставки 5 секунд
producer.flush(timeout=5.0)
# Подготавливаем сообщение
message_topic = 'test-topic'
message_headers = [
("source", "python-producer"),
("version", "1.0"),
("content-type", "text/plain")
]
message_key = 'synch_123'
message_value = {'data_1': 123, 'data_2': 'ОК'}
# Отправляем сообщения
# Сообщение 1
produce_sync(message_topic, message_headers, message_key, message_value)
# Сообщение 2
produce_sync(message_topic, message_headers, message_key, message_value)
# Завершаем работу: ждём, пока сообщения из буфера будут отправлены и обработаны колбэками.
# Затем закрываем соединения и освобождаем ресурсы
producer.flush()
Потребитель (Consumer)
Подобным образом создадим консьюмер. Как и в случае с продюсером, мы можем реализовать разные подходы, но обо всем по порядку.
Конфигурация консьюмера
conf = {
'bootstrap.servers': '<ваш_IP>:9092', # Адрес брокера
'group.id': 'test-group', # Идентификатор группы
'auto.offset.reset': 'earliest', # Чтение с начала топика
'enable.auto.commit': False # Ручное подтверждение сообщений
}
# Создание консьюмера
consumer = Consumer(conf)
<ваш_IP>
— также заменяем на IP адрес, указанный в advertised.listeners, конфигурации кластера server.properties.group.id
определяет группу потребителей, к которой принадлежит данный консьюмер. Это основа для:
-
Распределения нагрузки между консьюмерами.
Kafka автоматически распределяет партиции топика между консьюмерами одной группы
Каждое сообщение обрабатывается только одним консьюмером в группе
-
Отслеживания позиции чтения (офсетов).
Kafka хранит офсеты для каждой группы отдельно
При перезапуске консьюмер продолжает с того места, где остановилась группа
-
Гарантии доставки «каждый сообщение один раз»
В рамках одной группы сообщение не повторяется
auto.offset.reset
(Сброс смещения) — Действие при отсутствии сохраненного офсета:
earliest
— чтение с самого первого доступного сообщения (аналог--from-beginning
в CLI)latest
— чтение только новых сообщений (после подключения)none
— выброс исключения, если офсет не найден
enable.auto.commit
— автоматическое подтверждение получения сообщения. В нашем случае отключено. Это особенно полезно, когда нужно:
Проверить качество и валидность данных
Обработать сообщение (например, сохранить в БД)
И только затем подтвердить успешную обработку
Такой подход гарантирует, что сообщение не будет потеряно при ошибках обработки.
consumer = Consumer(conf)
— создадим экземпляр консьюмера и передадим ему конфигурацию
Далее можете ознакомится с основными параметрами консьюмера.
Параметр |
Назначение |
Пример значений |
(Адреса брокеров) |
Список брокеров для подключения (формат host:port). |
kafka1:9092, kafka2:9092 |
(ID группы) |
Идентификатор группы потребителей (для координации работы группы). |
"order-processors" |
(Сброс смещения) |
Действие при отсутствии оффсета: earliest (с начала), latest (только новые), none (ошибка). |
earliest (для восстановления данных) |
(Автоподтверждение) |
Автоматическое подтверждение оффсетов (true/false). Лучше отключать для точного контроля. |
false (ручное управление) |
(Макс. записей за опрос) |
Максимальное количество сообщений, возвращаемых за один вызов poll(). |
500 (баланс между скоростью и нагрузкой) |
(Таймаут сессии) |
Время, после которого потребитель считается "мертвым" и исключается из группы (мс). |
10000 (10 сек) |
(Интервал heartbeat) |
Частота отправки heartbeat-сигналов брокеру (мс). |
3000 (3 сек) |
(Макс. интервал опроса) |
Максимальное время между вызовами poll() до исключения из группы (мс). |
300000 (5 мин) |
Подписываемся на топик
Следующий шаг, подписываемся на топик. При использовании метода subscribe()
, консьюмер настроится на работу в автоматическом режиме, в соответствии со своей конфигурацией. Также Kafka автоматически распределит партиции между потребителями одной группы.
consumer.subscribe(['test-topic']) # Подписка на топик
Если необходимо управлять подпиской вручную, выбираем метод assign()
. Топик, партицию и начальный offset передаем методу напрямую. При этом настройки автоматической подписки и распределения партиций в конфигурации игнорируются. Для корректной работы с assign()
также важно установить enable.auto.commit = False
# Указываем топик, партицию и начальный offset вручную
topic = 'test-topic'
partition = 0 # Номер партиции (можно получить через consumer.assignment())
start_offset = 42 # Чтение начиная с offset=42
# Создаем объект TopicPartition и назначаем offset
tp = TopicPartition(topic, partition, start_offset)
consumer.assign([tp])
Читаем сообщения.
После подписки на топик мы можем получать сообщения из партиций. Это можно делать как по одному сообщению с помощью метода poll()
, так и пакетами — с помощью метода consume()
.
Poll( ) подход
msg = consumer.poll(timeout)
consumer.poll ()
возвращает сообщениеtimeout
— сколько времени ждать прихода сообщений, секунд. Если за указанное время сообщения не поступили, метод возвращает пустой список
Получение сообщений и обработка ошибок для метода Poll( ) будут выглядеть следующим образом:
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
# Нет сообщений в течение таймаута — продолжаем опрос
continue
# Обработка ошибок Kafka
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# Достигнут конец партиции — продолжаем слушать
continue
else:
# Получена критическая ошибка – прерываем цикл
print(f"Ошибка: {msg.error()}")
break
Сonsume( ) подход
messages = consumer.consume(num_messages, timeout
consumer.consume ()
возвращает список из сообщенийtimeout
— сколько времени ждать прихода сообщений, секунд.num_messages
— максимальное количество сообщений, которое вернется за раз. Может вернуться меньше (если в буфере меньше сообщений или сработал таймаут).
Получение сообщений и обработка ошибок для метода Consume( ) будут выглядеть следующим образом:
try:
while True:
messages = consumer.consume(num_messages=10, timeout=1.0)
if not messages:
# Нет сообщений в течение таймаута — продолжаем опрос
continue
for msg in messages:
# Обработка ошибок Kafka
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# Достигнут конец партиции — продолжаем слушать
continue
else:
# Получена критическая ошибка – прерываем цикл
print(f"Ошибка: {msg.error()}")
break
Обработка и десериализация
Несмотря на то, что в Java‑API Kafka (org.apache.kafka.clients.consumer.KafkaConsumer
) действительно есть конфиги key.deserializer
и value.deserializer
, которые автоматически превращают байты в объект, в Python‑обёртке confluent_kafka
этого механизма нет
В этой части работа библиотеки Python c C‑библиотекой librdkafka
ограничена
Поэтому в Python вы всегда получаете msg.value()
как байты, и десериализацию нужно делать вручную (.decode() + json.loads()
или другая логика).
# Получаем значение сообщения
key = msg.key().decode('utf-8') if msg.key() else None
raw_value = msg.value().decode('utf-8') if msg.value() else None
# Десериализуем JSON, если значение есть
value = json.loads(raw_value) if raw_value else None
# Обработка заголовков
headers = {}
if msg.headers():
for header in msg.headers():
headers[header[0]] = header[1].decode('utf-8')
# Вывод информации
print("\n--- Получено сообщение ---")
print(f"Топик: {msg.topic()}")
print(f"Партиция: {msg.partition()}")
print(f"Смещение: {msg.offset()}")
print(f"Ключ: {key}")
print(f"Значение: {value}")
print(f"Заголовки: {json.dumps(headers, indent=2)}")
Фиксация смещений (Committing Offsets)
После того как мы обработали наши сообщения – можем подтвердить получение сообщения брокеру.
consumer.commit(asynchronous=False)
Метод commit ()
принимает экземпляр объекта msg
и свойствоasynchronous
.
msg не обязательный параметр. Используйте его, только если вам нужно точно указать, какое сообщение коммитить. Без указания коммитятся все обработанные смещения (offsets) для всех партиций, которые в данный момент назначены потребителю
asynchronous = false — коммит происходит синхронно, потребитель ждёт подтверждения от брокера Kafka, что offset успешно сохранён. Замедляется обработка
asynchronous = true — коммит идёт в фоне. Потребитель не ждёт ответа — быстрее, но есть риск, что коммит не успеет выполниться до сбоя.
Завершаем работу
try:
# Работа с потребителем
consumer.subscribe(['topic'])
# ... обработка сообщений ...
finally:
consumer.close()
Метод consumer.close() — это обязательный финальный шаг работы с Kafka‑потребителем, который правильно завершает все процессы и освобождает ресурсы.
Что происходит при вызове:
Завершение сессии — потребитель сообщает группе о выходе
Сохранение позиций — фиксируются текущие смещения чтения
Закрытие соединений — разрываются все сетевые подключения к Kafka
Остановка процессов — завершаются фоновые потоки
Очистка памяти — освобождаются буферы и ресурсы
Важно не забывать вызывать consumer.close(), иначе:
Группа потребителей будет ждать возвращения «пропавшего» участника
Другие консьюмеры не смогут быстро взять его партиции
Могут возникнуть утечки памяти
Позиции чтения (offsets) могут не сохраниться
Перебалансировка потребителей
Последний шаг — повышаем надёжность нашего потребителя.
Добавим функции‑обработчики перебалансировки on_assign
и on_revoke
, которые позволяют реализовать пользовательскую логику при перераспределении партиций в группе потребителей (rebalance). Они вызываются автоматически, когда Kafka переназначает партиции между экземплярами потребителей.
Конфигурация
# Конфигурация консьюмера
conf = {
…
# Стратегия распределения партиций
'partition.assignment.strategy': 'roundrobin'
}
partition.assignment.strategy:‘roundrobin’
указывает, что партиции между потребителями будут распределяться по алгоритму «round‑robin» — равномерно, по кругу. Альтернатива — range
, но roundrobin
даёт более сбалансированную нагрузку.
Далее описываем логику функций on_assign
и on_revoke
.
Пока в них заложена базовая логика — логирование событий, чтобы было видно, какие партиции назначаются или отзываются.
В дальнейшем вы можете адаптировать их под свои задачи — например, управлять offset-ами, сохранять состояние или интегрироваться с внешними системами.
def on_assign(partitions):
"""Callback при назначении новых партиций потребителю."""
print("\n--- Назначены новые партиции ---")
for p in partitions:
print(f"Топик: {p.topic}, Партиция: {p.partition}")
# При необходимости можно вручную установить offset
# Например, чтобы продолжить с последнего закоммиченного значен��я:
# consumer.assign(partitions)
def on_revoke(consumer, partitions):
"""Callback при отзыве партиций у потребителя."""
print("\n--- Отозваны партиции ---")
for p in partitions:
print(f"Топик: {p.topic}, Партиция: {p.partition}")
# Критически важно зафиксировать offset'ы перед потерей контроля над партициями
consumer.commit(offsets=partitions)
print("Offsets зафиксированы.")
on_assign ( )
— функция вызывается, когда потребителю назначаются новые партиции, например:
при первом запуске,
после добавления нового потребителя в группу,
при восстановлении после сбоя.
on_revoke ( )
— функция вызывается, когда у потребителя отбираются партиции, например:
перед остановкой,
при масштабировании группы (запуске новых экземпляров),
при сбое соединения.
Консьюмер в действии
Подводя итог, тестируем разработанный нами консьюмер.
Помним, <ваш_IP>
— заменяем на IP адрес, указанный в advertised.listeners, конфигурации кластера server.properties.
Пример консьюмера
from confluent_kafka import Consumer, KafkaException
import json
import sys
import time
# Получаем идентификатор консьюмера из аргументов
consumer_id = sys.argv[1] if len(sys.argv) > 1 else "default_consumer"
# Конфигурация консьюмера
conf = {
'bootstrap.servers': '172.26.139.176:9092',
'group.id': 'test-group', # Одинаковая группа для всех!
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'partition.assignment.strategy': 'roundrobin'
}
def on_assign(consumer, partitions):
"""Callback при назначении новых партиций потребителю."""
print(f"\n[{consumer_id}] --- Назначены новые партиции ---")
for p in partitions:
print(f"[{consumer_id}] Топик: {p.topic}, Партиция: {p.partition}")
# Можно вручную установить offset
# consumer.seek(partitions[0])
def on_revoke(consumer, partitions):
"""Callback при отзыве партиций у потребителя."""
print(f"\n[{consumer_id}] --- Отозваны партиции ---")
for p in partitions:
print(f"[{consumer_id}] Топик: {p.topic}, Партиция: {p.partition}")
# Фиксация offset перед потерей партиций
if partitions:
consumer.commit(offsets=partitions)
print(f"[{consumer_id}] Offsets зафиксированы.")
# Создание консьюмера
consumer = Consumer(conf)
# Подписка с callback-функциями
consumer.subscribe(['test-topic'], on_assign=on_assign, on_revoke=on_revoke)
print(f"[{consumer_id}] Консьюмер запущен и ожидает сообщения...")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
continue
else:
print(f"[{consumer_id}] Ошибка: {msg.error()}")
break
# Обработка сообщения
key = msg.key().decode('utf-8') if msg.key() else None
raw_value = msg.value().decode('utf-8') if msg.value() else None
value = json.loads(raw_value) if raw_value else None
headers = {}
if msg.headers():
for header in msg.headers():
headers[header[0]] = header[1].decode('utf-8')
# Вывод информации с идентификатором консьюмера
print(f"\n[{consumer_id}] --- Получено сообщение ---")
print(f"[{consumer_id}] Топик: {msg.topic()}")
print(f"[{consumer_id}] Партиция: {msg.partition()}")
print(f"[{consumer_id}] Смещение: {msg.offset()}")
print(f"[{consumer_id}] Ключ: {key}")
print(f"[{consumer_id}] Значение: {value}")
# Подтверждение обработки
consumer.commit(asynchronous=False)
time.sleep(0.5) # Задержка для наглядности
except KeyboardInterrupt:
print(f"[{consumer_id}] Прерывание пользователем")
finally:
consumer.close()
print(f"[{consumer_id}] Консьюмер остановлен")
Проверяем работу брокера. Отправляем из продюсера тестовое сообщение “Hello World”— главное заклинание разработчиков.

Проверяем работу ребалансировщика.
Запускаем консьюмер №1

Запускаем консьюмер №2. Произошла ребаллансировка:


Заключение
В Kafka каждую тему можно раскрывать, пожалуй, до бесконечности. Мы же с вами разобрали наиболее важные моменты, чтобы начать пользоваться этой системой.
Главное, чему мы научились:
Разобрались с основами — от проектирования топиков до настройки продюсеров и консьюмеров
Познакомились с тонкостями работы с сообщениями: сериализация, партиционирование, батчинг и сжатие
Научились настраивать надёжные и производительные системы обмена данными
Освоили библиотеку confluent_kafka для работы с Python
Самое главное, что вы теперь умеете:
Проектировать топки с учётом бизнес‑требований
Оптимизировать производительность через батчинг и сжатие
Управлять группами потребителей и их перебалансировкой
Настраивать надёжную доставку сообщений
Помните, что Kafka — это мощный инструмент, который открывает перед вами огромные возможности. Продолжайте практиковаться, и вы сможете создавать по‑настоящему крутые решения!