Привет, Хабр! Это моя первая статья + я являюсь джуном, так что очень жду вашей критики (пожалуйста адекватной)
Немного о мотивации
Я ежедневно работаю с кафкой, но вот поймал себя на мысли, что не понимаю как она устроена(прям вот совсем плохо)! В моей голове живет мысль — «хочешь понять технологию — напиши ее», но изобретать такой велосипед как кафка — сил мне не хватит (надеюсь пока что). Так что мной было принято решение написать какое‑то маленькое демо‑приложение с поднятием кафки с нуля.
Apache Kafka
Есть множество статей и видео с детальным описанием устройства кафки, но пока сам не потрогаешь — все эти топики, партиции, офсеты будут непонятны(конечно говорю о своем джунском уровне).
С моего джунского трона — кафка это меганадежная очередь сообщений, то есть если все правильно настроено, то продюсер точно отправит сообщение и оно с невероятно большим шансом дойдет до консьюмера. Кафка подразумевает в себе огромное количество перестраховок на случай падения той или иной части сервиса, репликаций и тп. Можно сказать, что в ней нет функции удаления сообщений(они будут хранится там пока не пройдет их TTL)
Кластер — множество брокеров.
Продюсеры — клиентский отправитель сообщения в топик.
Топик — логический канал, куда публикуются сообщения(по сути своей очередь, тк сообщения добавляются в конец), по сути своей множество партиций.
Брокеры — серверы, образующие кластер кафки, в них как раз лежат топики.(Клиенты (производители и потребители) подключаются к любому брокеру, используя его как bootstrap‑сервер. После подключения брокер предоставляет данные обо всем кластере, включая информацию о других брокерах, топиках и пкартициях)
Партиции — каждый топик делится на партиции, и партиции разбросаны по разным брокерам (напомню, топик это логический канал).
Консьюмер— клиент, который подписывается на топики и читает сообщения. Собственно это и отличает кафку от обычной очереди — тк разные консьюмеры могут вычитывать из топика.
Оффсет — уникальный номер (по сути от 0 до последнего сообщения) сообщения в партиции.
Резюмируем(упрощенно, пока без реплик) — есть кластер, это множество брокеров, в брокерах лежат логические каналы — топики( при том, что один топик может быть разделен на несколько партиций, которые лежат в разных брокерах — горизонтальное масштабирование и параллелизм:) ).

P.S.
После прочтения данного материала не считайте, что вы разобрались в кафке, у нее еще есть множество свойств — сколько не читал все равно ощущение что лягушка на дне колодца(реплики партиций, контроллер‑брокер, Zookeeper, правила распределения партиций по брокерам и тд)
Docker + Maven
Для начала создадим проект в spring-initializr, добавим кафку и докер зависимости, тк поднимать кафку мы будем в докере.
Получим такой pom с нужными зависимостями.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>kafka.demo.docker</groupId>
<artifactId>kafkaDemoDocker</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafkaDemoDocker</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>21</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-docker-compose</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Далее поднимем кафку в докере:
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka-broker
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:29093,BROKER://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
Разберем файл компоуза:
с 1 по 6 строчку мы определяем сервис и контейнер(ы) — его название (kafka), версию кафки(3.7.0), имя контейнера — kafka‑broker, настраиваем пробрасывание порта с хоста(в моем случае ноутбука) в контейнер чтобы обращаться из спрингбут приложения в кафку.
Далее будет немного сложнее — в 7–9 строчках мы настраиваем номер ноды в кафке(чтобы на хорошем уровне понимать что это значит стоит прочитать это). Если кратко — раньше был Zookeeper, в каком‑то смысле менеджер кафки и в то же время хранилище метаданных/настроек оной (с помощью Zookeeper, например, выбирался контроллер кластера). Теперь вместо Zookeeper кафка сама хранит метаданные и управляет собой на основе технологии рафт‑ на одном сервере. (до версии 3.3.0 сервер был либо контроллером, либо брокером, но теперь он может работать в комбинированном режиме, что и указано в 9 строчке)
В 10 строчке мы указываем какие порты кафка будет слушать внутри контейнера, один для связи контроллеров, другой для связи между брокерами.
В 11 строчке мы указываем два адреса. Первый — адрес для докер сети, по которому к нашему брокеру будут обращаться, а второй адрес — подключения на хост‑машине(в моем случае ноутбук).
В 12 строчке устанавливаем, что контроллером будет листинер, который называется контроллер и соответственно слушает 29 093 порт
В 13 строчке определяем кворум контроллер для кластера. В Формате <node_id>@<hostname>:<port>
Кворум это
Кворум — нечетное количество серверов с ролью контроллер(или комбинированной), работающих по протоколу Raft — гарантирующему, что все ноды кворума имеют одинаковые метаданные.
Используется для перераспределения партиций в случае падения брокера или выбора нового главного контроллера в случае его падения.
В 14 строчке настраиваем протокол обменивания информации, в нашем случае достаточно передавать голый текст, без использования SSL??
В 15 строчке выбираем брокера(аналогично контроллеру).
В 16 указываем количество репликаций внутреннего топика(тк по умолчанию их 3, а у нас всего один брокер, а исходя из правила кафки все ломается. Попробуйте разобраться подробнее и напишите в комментарии!).
Правило Kafka
Фактор репликации (количество копий топика) не может быть больше, чем число живых брокеров в кластере.
УРА! Вроде бы разобрались
Далее мы поднимаем кафку:
docker compose up -d
Затем заходим в контейнер и создаем топик прямо из него
docker exec -it kafka-broker /bin/sh
Если вам как и мне лень добавлять что-то в PATH, а чтобы создать топик кафки - нам нужен файл kafka-topics.sh, как именно это работает я разбирать не буду... В общем нам нужно перейти в директорию с этим файлом.
cd /opt/kafka/bin/
Возможно у вас он лежит в другом месте, тогда команда find в помощь.
После этого выполняем команду
bash kafka-topics.sh --create \
--topic demo-topic \
--bootstrap-server kafka:29092 \
--partitions 1 \
--replication-factor 1
Создаем топик с названием demo‑topic, далее указываем сервер из yaml файла, тк в докере мы общаемся по именам, с 1 партицией и одной ее копией.
Эта команда под капотом запускает нужный джава класс(тк кафка написана на джаве) с данными аргументами.
Вызванный нами скрипт, если убрать комментарии выглядит так:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
Springboot
Наконец‑то переходим к java коду:
Начнем с продюсера.
Для начала укажем конфиги — адрес сервера, тип сериализации для ключа и значения нашего сообщения из строк в байты.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Далее создадим экземпляр класса KafkaProducer
Producer<String, String> producer = new KafkaProducer<>(props);
Далее определим наше сообщение - топик, куда оно отправится, ключ и его значение(key - необязательный параметр)
Немного про ключ
Сообщения с одинаковым ключом всегда попадают в одну и ту же партицию и обрабатываются в порядке отправления + ими удобно разделять по принадлежности к какому-то атрибуту
String topic = "demo-topic";
String key = "test-key";
String value = "Hello, world!";
И конечно сама логика отправки сообщения обернутая в try-catch конечно же
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Сообщение отправлено успешно! topic = %s, partition = %d, offset = %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
System.err.println("Ошибка при отправке сообщения: " + e.getMessage());
} finally {
producer.close();
}
Что происходит под капотом producer.send(record)
1) Сериализация ключа и сообщения
2) Определяется хеш ключа(если указан), и определяется раздел топика
3) Отправка в брокер через батч
4) Далее ожидание ответа
.get()
блокирует текущий поток до получения ответа от брокера
5) После успешной записи возврщается объект классa RecordMetadata с информацией
Топик, в который записано сообщение
Раздел (partition)
Offset (позиция в разделе)
Временная метка
Хочется сказать, что для продакшена не рекомендуется использовать.get, то есть не блокировать поток, а делать так
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Отправлено успешно: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
} else {
System.err.println("Ошибка отправки: " + exception.getMessage());
}
});
Итоговый класс выглядит так:
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "demo-topic";
String key = "test-key";
String value = "Hello, world!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Сообщение отправлено успешно! topic = %s, partition = %d, offset = %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
System.err.println("Ошибка при отправке сообщения: " + e.getMessage());
} finally {
producer.close();
}
}
}
Теперь напишем Консюмер
Здесь в конфигах укажем то же самое + название группы потребителей + поведение при отсутствии сохраненного офсета для вычитки.
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Далее создадим экземпляр класса потребителя
Consumer<String, String> consumer = new KafkaConsumer<>(props);
Подписываемся на нужный топик:
String topic = "demo-topic";
consumer.subscribe(Collections.singletonList(topic));
Добавим небольшое логирование и сама логика отправки:
System.out.println("Ожидание сообщений из топика: " + topic + ". Для выхода нажмите Ctrl+C");
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Получено сообщение: key = %s, value = %s, partition = %d, offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close();
}
Что происходит под капотом consumer.poll(Duration.ofMillis(100));
0) Наши приложение тригерит кафка-консюмер, а дальше действует он
1) Спрашиваем брокер о метаданных
Какие разделы (partitions) ему назначены после балансировки
Текущую позицию (offset) для каждого раздела
Какой брокер является лидером для каждого раздела
2) Консюмер отправляет fetch-запросы к брокерам лидерам партиций
3) В течение 100мс ожидает ответ
4) Орабатывает ответ
Десериализуются (преобразуются из байтов в строки)
Группируются в объект
ConsumerRecords
Offsets автоматически обновляются для последующего коммита
Итоговый класс выглядит так:
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "demo-topic";
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Ожидание сообщений из топика: " + topic + ". Для выхода нажмите Ctrl+C");
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Получено сообщение: key = %s, value = %s, partition = %d, offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close();
}
}
}
Чтобы проверить работу приложения, запустите класс консюмера, а потом продюсера - готово!