Привет, Хабр!
Сегодня мы рассмотрим, как протестировать Kafka с помощью Testcontainers.
Testcontainers — это библиотека, которая из JUnit‑теста запускает Docker‑контейнеры как обычные Java‑объекты. Вы пишете пару строк — а на фоне поднимается полноценная инфраструктура: база, брокер, Redis, что угодно. После теста контейнер гарантированно останавливается, поэтому окружение всегда чистое, а CI не засоряется процессами.
Kafka в Testcontainers запускается теми же двумя строками. Получаем реальный брокер, который ничем не отличается от продакшен‑копии, но живёт ровно столько, сколько идёт тест.
Подключаем зависимости: Gradle и Maven
Если вы на Gradle:
dependencies {
testImplementation platform("org.testcontainers:testcontainers-bom:1.21.3")
testImplementation("org.testcontainers:kafka")
}
А если Maven:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.21.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Версия 1.21.3 — на момент написания последняя стабильная.
Первый интеграционный тест с KafkaContainer
Начнём с базового кейса: хочется протестировать, что Kafka работает, продюсер может записать сообщение, а консьюмер прочитать его обратно.
@Testcontainers
public class KafkaSmokeTest {
// Объявляем KafkaContainer как @Container — Testcontainers сам поднимет и убьёт его
@Container
static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("apache/kafka-native:3.8.0")
);
@Test
void produceConsume() throws Exception {
// Получаем bootstrap-адрес запущенного брокера
String bootstrap = kafka.getBootstrapServers();
// Конфигурируем Kafka-продюсер
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Создаём продюсер и отправляем одно сообщение в demo-topic
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>("demo-topic", "my-key", "Hello Kafka!")).get();
producer.close();
// Конфигурируем Kafka-консьюмер
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Создаём консьюмера и подписываемся на тот же топик
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("demo-topic"));
// Ждём сообщения
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
// Проверяем, что пришло именно то сообщение
assertEquals(1, records.count());
assertEquals("Hello Kafka!", records.iterator().next().value());
}
}
Контейнер Kafka поднимается автоматически перед тестом, и не нужно ничего запускать вручную. apache/kafka-native:3.8.0
это официальный Kafka‑образ, работающий в KRaft‑режиме (без ZooKeeper).
Вручную создаём продюсера и консьюмера, чтобы максимально контролировать процесс. Проверка assertEquals
подтверждает, что сообщение не просто «куда‑то ушло», а дошло до консьюмера.
Что с ZooKeeper и режимами Kafka?
Kafka долгое время зависела от ZooKeeper — для хранения метаданных, регистрации брокеров, и всей магии с кворумами. Но с версии 3.3 Kafka официально переведена в KRaft‑режим (Kafka Raft), где всё хранится внутри самого брокера.
В нашем примере используется образ:
DockerImageName.parse("apache/kafka-native:3.8.0")
Он работает в режиме KRaft по умолчанию. Это значит:
ZooKeeper не нужен;
всё конфигурируется проще;
быстрее стартует;
меньше зависимостей.
Если вам по какой‑то причине всё‑таки нужен ZooKeeper — например, для старых клиентов или особой топологии, то есть ConfluentKafkaContainer
с образом confluentinc/cp-kafka:<до-7.4.0>
.
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:6.2.1");
Но если нет строгих требований используемapache/kafka-native
и живем спокойно.
Создание топиков вручную через AdminClient
По дефолту Kafka может создавать топики при первом продюсе. Но это плохая практика, продакшене чаще всего auto.create.topics.enable=false
. Поэтому явно создаём нужные топики через AdminClient.
Вот так:
try (AdminClient admin = AdminClient.create(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()
))) {
List<NewTopic> topics = List.of(
new NewTopic("input", 1, (short)1),
new NewTopic("primes", 1, (short)1),
new NewTopic("composites", 1, (short)1),
new NewTopic("dlq", 1, (short)1)
);
admin.createTopics(topics).all().get();
}
new NewTopic("имя", партиции, фактор репликации)
— в тестах у нас один брокер, так что replicationFactor
должен быть строго 1
. createTopics(...).all().get()
блокирует выполнение до полной регистрации тем на брокере. Без этого возможны гонки.
Если не задать replicationFactor=1
, Kafka может начать пытаться найти других брокеров (которых нет) и выбросит TimeoutException
.
Пример поинтереснее: маршрутизация по топикам
Читаем из топика числа в виде строк, парсим их, проверяем, простое ли число, и отправляем либо в primes
, либо в composites
. Если парс не удался — в dlq
.
public void routeMessages(KafkaConsumer<String, String> consumer, KafkaProducer<String, String> producer) {
consumer.subscribe(List.of("input"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
try {
int value = Integer.parseInt(record.value());
String topic = isPrime(value) ? "primes" : "composites";
producer.send(new ProducerRecord<>(topic, record.key(), record.value()));
} catch (NumberFormatException ex) {
producer.send(new ProducerRecord<>("dlq", record.key(), record.value()));
}
}
}
}
private boolean isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i <= Math.sqrt(n); i++) {
if (n % i == 0) return false;
}
return true;
}
Простой алгоритм, но идеальный для теста. Мы можем:
отправить
7
, получитьprimes
;отправить
8
, получитьcomposites
;отправить
"abc"
, получитьdlq
.
Сетевые listener и взаимодействие между контейнерами
Если есть ещё один контейнер (например, с kcat
, или с сервисом) и хочется, чтобы он подключался к Kafka в той же сети:
Network net = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withNetwork(net)
.withListener("broker:19092"); // будет виден как "broker:19092"
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.9.0")
.withNetwork(net)
.withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("sh"))
.withCopyToContainer(Transferable.of("7\n8\nabc\n"), "/msgs.txt")
.withCommand("-c", "tail -f /dev/null");
kcat.start();
kafka.start();
Теперь можно из kcat
в тесте:
kcat.execInContainer("kcat", "-b", "broker:19092", "-t", "input", "-P", "-l", "/msgs.txt");
Или зачекать результат:
String output = kcat.execInContainer("kcat", "-b", "broker:19092", "-t", "primes", "-C", "-e", "-c", "1").getStdout();
Делитесь своим опытом тестирования Kafka в комментариях, задавайте вопросы, спорьте с решениями — чем больше практических кейсов мы соберём под этой статьёй, тем сильнее станет сообщество. Спасибо, что дочитали!
Приглашаем вас принять участие в серии открытых уроков по Apache Kafka, которые помогут глубже понять ключевые аспекты работы с этой технологией.
30 июля в 19:00 пройдет занятие «Apache Kafka в микросервисной архитектуре — лучшие практики асинхронного обмена». На нем рассмотрим подходы к организации обмена сообщениями в распределенных системах.
13 августа в 18:00 вы сможете ознакомиться с архитектурными паттернами работы с Kafka — на уроке обсудим методы построения устойчивых и масштабируемых решений.
20 августа в 20:00 пройдет урок «Kafka и Clickhouse — как организовать взаимодействие», посвященный интеграции Kafka с аналитической платформой Clickhouse.
А если вы настроены на серьезное обучение, рекомендуем ознакомиться с программой курса по Apache Kafka — на нём максимум практики по работе с Kafka для инженеров данных и разработчиков.