Привет, Хабр! Это моя первая статья + я являюсь джуном, так что очень жду вашей критики (пожалуйста адекватной)

Немного о мотивации

Я ежедневно работаю с кафкой, но вот поймал себя на мысли, что не понимаю как она устроена(прям вот совсем плохо)! В моей голове живет мысль — «хочешь понять технологию — напиши ее», но изобретать такой велосипед как кафка — сил мне не хватит (надеюсь пока что). Так что мной было принято решение написать какое‑то маленькое демо‑приложение с поднятием кафки с нуля.

Apache Kafka

Есть множество статей и видео с детальным описанием устройства кафки, но пока сам не потрогаешь — все эти топики, партиции, офсеты будут непонятны(конечно говорю о своем джунском уровне).

Если нужно что‑то подробнее

С моего джунского трона — кафка это меганадежная очередь сообщений, то есть если все правильно настроено, то продюсер точно отправит сообщение и оно с невероятно большим шансом дойдет до консьюмера. Кафка подразумевает в себе огромное количество перестраховок на случай падения той или иной части сервиса, репликаций и тп. Можно сказать, что в ней нет функции удаления сообщений(они будут хранится там пока не пройдет их TTL)

Кластер — множество брокеров.

Продюсеры — клиентский отправитель сообщения в топик.

Топик — логический канал, куда публикуются сообщения(по сути своей очередь, тк сообщения добавляются в конец), по сути своей множество партиций.

Брокеры — серверы, образующие кластер кафки, в них как раз лежат топики.(Клиенты (производители и потребители) подключаются к любому брокеру, используя его как bootstrap‑сервер. После подключения брокер предоставляет данные обо всем кластере, включая информацию о других брокерах, топиках и пкартициях)

Партиции — каждый топик делится на партиции, и партиции разбросаны по разным брокерам (напомню, топик это логический канал).

Консьюмер— клиент, который подписывается на топики и читает сообщения. Собственно это и отличает кафку от обычной очереди — тк разные консьюмеры могут вычитывать из топика.

Оффсет — уникальный номер (по сути от 0 до последнего сообщения) сообщения в партиции.

Резюмируем(упрощенно, пока без реплик) — есть кластер, это множество брокеров, в брокерах лежат логические каналы — топики( при том, что один топик может быть разделен на несколько партиций, которые лежат в разных брокерах — горизонтальное масштабирование и параллелизм:) ).

Один топик в разных брокерах(картинка из статьи)
Один топик в разных брокерах(картинка из статьи)
P.S.

После прочтения данного материала не считайте, что вы разобрались в кафке, у нее еще есть множество свойств — сколько не читал все равно ощущение что лягушка на дне колодца(реплики партиций, контроллер‑брокер, Zookeeper, правила распределения партиций по брокерам и тд)

Docker + Maven

Для начала создадим проект в spring-initializr, добавим кафку и докер зависимости, тк поднимать кафку мы будем в докере.

Получим такой pom с нужными зависимостями.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.5.5</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>kafka.demo.docker</groupId>
	<artifactId>kafkaDemoDocker</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafkaDemoDocker</name>
	<description>Demo project for Spring Boot</description>
	<url/>
	<licenses>
		<license/>
	</licenses>
	<developers>
		<developer/>
	</developers>
	<scm>
		<connection/>
		<developerConnection/>
		<tag/>
		<url/>
	</scm>
	<properties>
		<java.version>21</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-docker-compose</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Далее поднимем кафку в докере:

services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka-broker
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: CONTROLLER://0.0.0.0:29093,BROKER://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

Разберем файл компоуза:

с 1 по 6 строчку мы определяем сервис и контейнер(ы) — его название (kafka), версию кафки(3.7.0), имя контейнера — kafka‑broker, настраиваем пробрасывание порта с хоста(в моем случае ноутбука) в контейнер чтобы обращаться из спрингбут приложения в кафку.

Далее будет немного сложнее — в 7–9 строчках мы настраиваем номер ноды в кафке(чтобы на хорошем уровне понимать что это значит стоит прочитать это). Если кратко — раньше был Zookeeper, в каком‑то смысле менеджер кафки и в то же время хранилище метаданных/настроек оной (с помощью Zookeeper, например, выбирался контроллер кластера). Теперь вместо Zookeeper кафка сама хранит метаданные и управляет собой на основе технологии рафт‑ на одном сервере. (до версии 3.3.0 сервер был либо контроллером, либо брокером, но теперь он может работать в комбинированном режиме, что и указано в 9 строчке)

В 10 строчке мы указываем какие порты кафка будет слушать внутри контейнера, один для связи контроллеров, другой для связи между брокерами.

В 11 строчке мы указываем два адреса. Первый — адрес для докер сети, по которому к нашему брокеру будут обращаться, а второй адрес — подключения на хост‑машине(в моем случае ноутбук).

В 12 строчке устанавливаем, что контроллером будет листинер, который называется контроллер и соответственно слушает 29 093 порт

В 13 строчке определяем кворум контроллер для кластера. В Формате <node_id>@<hostname>:<port>

Кворум это

Кворум — нечетное количество серверов с ролью контроллер(или комбинированной), работающих по протоколу Raft — гарантирующему, что все ноды кворума имеют одинаковые метаданные.

Используется для перераспределения партиций в случае падения брокера или выбора нового главного контроллера в случае его падения.

В 14 строчке настраиваем протокол обменивания информации, в нашем случае достаточно передавать голый текст, без использования SSL??

В 15 строчке выбираем брокера(аналогично контроллеру).

В 16 указываем количество репликаций внутреннего топика(тк по умолчанию их 3, а у нас всего один брокер, а исходя из правила кафки все ломается. Попробуйте разобраться подробнее и напишите в комментарии!).

Правило Kafka

Фактор репликации (количество копий топика) не может быть больше, чем число живых брокеров в кластере.

УРА! Вроде бы разобрались

Далее мы поднимаем кафку:

docker compose up -d

Затем заходим в контейнер и создаем топик прямо из него

docker exec -it kafka-broker /bin/sh

Если вам как и мне лень добавлять что-то в PATH, а чтобы создать топик кафки - нам нужен файл kafka-topics.sh, как именно это работает я разбирать не буду... В общем нам нужно перейти в директорию с этим файлом.

cd /opt/kafka/bin/

Возможно у вас он лежит в другом месте, тогда команда find в помощь.

После этого выполняем команду

bash kafka-topics.sh --create \
  --topic demo-topic \
  --bootstrap-server kafka:29092 \
  --partitions 1 \
  --replication-factor 1

Создаем топик с названием demo‑topic, далее указываем сервер из yaml файла, тк в докере мы общаемся по именам, с 1 партицией и одной ее копией.

Эта команда под капотом запускает нужный джава класс(тк кафка написана на джаве) с данными аргументами.

Вызванный нами скрипт, если убрать комментарии выглядит так:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

Springboot

Наконец‑то переходим к java коду:

Начнем с продюсера.

Для начала укажем конфиги — адрес сервера, тип сериализации для ключа и значения нашего сообщения из строк в байты.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

Далее создадим экземпляр класса KafkaProducer

Producer<String, String> producer = new KafkaProducer<>(props);

Далее определим наше сообщение - топик, куда оно отправится, ключ и его значение(key - необязательный параметр)

Немного про ключ

Сообщения с одинаковым ключом всегда попадают в одну и ту же партицию и обрабатываются в порядке отправления + ими удобно разделять по принадлежности к какому-то атрибуту

String topic = "demo-topic";
String key = "test-key";
String value = "Hello, world!";

И конечно сама логика отправки сообщения обернутая в try-catch конечно же

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.printf("Сообщение отправлено успешно! topic = %s, partition = %d, offset = %d%n",
            metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    System.err.println("Ошибка при отправке сообщения: " + e.getMessage());
} finally {
    producer.close();
}
Что происходит под капотом producer.send(record)

1) Сериализация ключа и сообщения

2) Определяется хеш ключа(если указан), и определяется раздел топика

3) Отправка в брокер через батч

4) Далее ожидание ответа

  • .get() блокирует текущий поток до получения ответа от брокера

5) После успешной записи возврщается объект классa RecordMetadata с информацией

  • Топик, в который записано сообщение

  • Раздел (partition)

  • Offset (позиция в разделе)

  • Временная метка

Хочется сказать, что для продакшена не рекомендуется использовать.get, то есть не блокировать поток, а делать так

producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.printf("Отправлено успешно: partition=%d, offset=%d%n", 
            metadata.partition(), metadata.offset());
    } else {
        System.err.println("Ошибка отправки: " + exception.getMessage());
    }
});

Итоговый класс выглядит так:

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


        Producer<String, String> producer = new KafkaProducer<>(props);


        String topic = "demo-topic";
        String key = "test-key";
        String value = "Hello, world!";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        try {
            RecordMetadata metadata = producer.send(record).get();
            System.out.printf("Сообщение отправлено успешно! topic = %s, partition = %d, offset = %d%n",
                    metadata.topic(), metadata.partition(), metadata.offset());
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Ошибка при отправке сообщения: " + e.getMessage());
        } finally {
            producer.close();
        }
    }
}

Теперь напишем Консюмер

Здесь в конфигах укажем то же самое + название группы потребителей + поведение при отсутствии сохраненного офсета для вычитки.

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Далее создадим экземпляр класса потребителя

Consumer<String, String> consumer = new KafkaConsumer<>(props);

Подписываемся на нужный топик:

String topic = "demo-topic";
consumer.subscribe(Collections.singletonList(topic));

Добавим небольшое логирование и сама логика отправки:

System.out.println("Ожидание сообщений из топика: " + topic + ". Для выхода нажмите Ctrl+C");

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Получено сообщение: key = %s, value = %s, partition = %d, offset = %d%n",
                    record.key(), record.value(), record.partition(), record.offset());
        }
    }
} finally {
    consumer.close();
}
Что происходит под капотом consumer.poll(Duration.ofMillis(100));

0) Наши приложение тригерит кафка-консюмер, а дальше действует он

1) Спрашиваем брокер о метаданных

  • Какие разделы (partitions) ему назначены после балансировки

  • Текущую позицию (offset) для каждого раздела

  • Какой брокер является лидером для каждого раздела

2) Консюмер отправляет fetch-запросы к брокерам лидерам партиций

3) В течение 100мс ожидает ответ

4) Орабатывает ответ

  • Десериализуются (преобразуются из байтов в строки)

  • Группируются в объект ConsumerRecords

  • Offsets автоматически обновляются для последующего коммита

Итоговый класс выглядит так:

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        String topic = "demo-topic";
        consumer.subscribe(Collections.singletonList(topic));

        System.out.println("Ожидание сообщений из топика: " + topic + ". Для выхода нажмите Ctrl+C");

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Получено сообщение: key = %s, value = %s, partition = %d, offset = %d%n",
                            record.key(), record.value(), record.partition(), record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Чтобы проверить работу приложения, запустите класс консюмера, а потом продюсера - готово!

Комментарии (0)