Привет, Хабр!

Сегодня мы рассмотрим, как протестировать Kafka с помощью Testcontainers.

Testcontainers — это библиотека, которая из JUnit‑теста запускает Docker‑контейнеры как обычные Java‑объекты. Вы пишете пару строк — а на фоне поднимается полноценная инфраструктура: база, брокер, Redis, что угодно. После теста контейнер гарантированно останавливается, поэтому окружение всегда чистое, а CI не засоряется процессами.

Kafka в Testcontainers запускается теми же двумя строками. Получаем реальный брокер, который ничем не отличается от продакшен‑копии, но живёт ровно столько, сколько идёт тест.

Подключаем зависимости: Gradle и Maven

Если вы на Gradle:

dependencies {
    testImplementation platform("org.testcontainers:testcontainers-bom:1.21.3")
    testImplementation("org.testcontainers:kafka")
}

А если Maven:

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.testcontainers</groupId>
      <artifactId>testcontainers-bom</artifactId>
      <version>1.21.3</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <scope>test</scope>
  </dependency>
</dependencies>

Версия 1.21.3 — на момент написания последняя стабильная.

Первый интеграционный тест с KafkaContainer

Начнём с базового кейса: хочется протестировать, что Kafka работает, продюсер может записать сообщение, а консьюмер прочитать его обратно.

@Testcontainers
public class KafkaSmokeTest {

    // Объявляем KafkaContainer как @Container — Testcontainers сам поднимет и убьёт его
    @Container
    static final KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("apache/kafka-native:3.8.0")
    );

    @Test
    void produceConsume() throws Exception {
        // Получаем bootstrap-адрес запущенного брокера
        String bootstrap = kafka.getBootstrapServers();

        // Конфигурируем Kafka-продюсер
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Создаём продюсер и отправляем одно сообщение в demo-topic
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
        producer.send(new ProducerRecord<>("demo-topic", "my-key", "Hello Kafka!")).get();
        producer.close();

        // Конфигурируем Kafka-консьюмер
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Создаём консьюмера и подписываемся на тот же топик
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(List.of("demo-topic"));

        // Ждём сообщения
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

        // Проверяем, что пришло именно то сообщение
        assertEquals(1, records.count());
        assertEquals("Hello Kafka!", records.iterator().next().value());
    }
}

Контейнер Kafka поднимается автоматически перед тестом, и не нужно ничего запускать вручную. apache/kafka-native:3.8.0 это официальный Kafka‑образ, работающий в KRaft‑режиме (без ZooKeeper).

Вручную создаём продюсера и консьюмера, чтобы максимально контролировать процесс. Проверка assertEquals подтверждает, что сообщение не просто «куда‑то ушло», а дошло до консьюмера.

Что с ZooKeeper и режимами Kafka?

Kafka долгое время зависела от ZooKeeper — для хранения метаданных, регистрации брокеров, и всей магии с кворумами. Но с версии 3.3 Kafka официально переведена в KRaft‑режим (Kafka Raft), где всё хранится внутри самого брокера.

В нашем примере используется образ:

DockerImageName.parse("apache/kafka-native:3.8.0")

Он работает в режиме KRaft по умолчанию. Это значит:

  • ZooKeeper не нужен;

  • всё конфигурируется проще;

  • быстрее стартует;

  • меньше зависимостей.

Если вам по какой‑то причине всё‑таки нужен ZooKeeper — например, для старых клиентов или особой топологии, то есть ConfluentKafkaContainer с образом confluentinc/cp-kafka:<до-7.4.0>.

ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:6.2.1");

Но если нет строгих требований используемapache/kafka-native и живем спокойно.

Создание топиков вручную через AdminClient

По дефолту Kafka может создавать топики при первом продюсе. Но это плохая практика, продакшене чаще всего auto.create.topics.enable=false. Поэтому явно создаём нужные топики через AdminClient.

Вот так:

try (AdminClient admin = AdminClient.create(Map.of(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()
))) {
    List<NewTopic> topics = List.of(
        new NewTopic("input", 1, (short)1),
        new NewTopic("primes", 1, (short)1),
        new NewTopic("composites", 1, (short)1),
        new NewTopic("dlq", 1, (short)1)
    );

    admin.createTopics(topics).all().get();
}

new NewTopic("имя", партиции, фактор репликации) — в тестах у нас один брокер, так что replicationFactor должен быть строго 1. createTopics(...).all().get() блокирует выполнение до полной регистрации тем на брокере. Без этого возможны гонки.

Если не задать replicationFactor=1, Kafka может начать пытаться найти других брокеров (которых нет) и выбросит TimeoutException.

Пример поинтереснее: маршрутизация по топикам

Читаем из топика числа в виде строк, парсим их, проверяем, простое ли число, и отправляем либо в primes, либо в composites. Если парс не удался — в dlq.

public void routeMessages(KafkaConsumer<String, String> consumer, KafkaProducer<String, String> producer) {
    consumer.subscribe(List.of("input"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            try {
                int value = Integer.parseInt(record.value());
                String topic = isPrime(value) ? "primes" : "composites";
                producer.send(new ProducerRecord<>(topic, record.key(), record.value()));
            } catch (NumberFormatException ex) {
                producer.send(new ProducerRecord<>("dlq", record.key(), record.value()));
            }
        }
    }
}

private boolean isPrime(int n) {
    if (n < 2) return false;
    for (int i = 2; i <= Math.sqrt(n); i++) {
        if (n % i == 0) return false;
    }
    return true;
}

Простой алгоритм, но идеальный для теста. Мы можем:

  • отправить 7, получить primes;

  • отправить 8, получить composites;

  • отправить "abc", получить dlq.

Сетевые listener и взаимодействие между контейнерами

Если есть ещё один контейнер (например, с kcat, или с сервисом) и хочется, чтобы он подключался к Kafka в той же сети:

Network net = Network.newNetwork();

KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
    .withNetwork(net)
    .withListener("broker:19092"); // будет виден как "broker:19092"

GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.9.0")
    .withNetwork(net)
    .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("sh"))
    .withCopyToContainer(Transferable.of("7\n8\nabc\n"), "/msgs.txt")
    .withCommand("-c", "tail -f /dev/null");

kcat.start();
kafka.start();

Теперь можно из kcat в тесте:

kcat.execInContainer("kcat", "-b", "broker:19092", "-t", "input", "-P", "-l", "/msgs.txt");

Или зачекать результат:

String output = kcat.execInContainer("kcat", "-b", "broker:19092", "-t", "primes", "-C", "-e", "-c", "1").getStdout();

Делитесь своим опытом тестирования Kafka в комментариях, задавайте вопросы, спорьте с решениями — чем больше практических кейсов мы соберём под этой статьёй, тем сильнее станет сообщество. Спасибо, что дочитали!

Приглашаем вас принять участие в серии открытых уроков по Apache Kafka, которые помогут глубже понять ключевые аспекты работы с этой технологией.

30 июля в 19:00 пройдет занятие «Apache Kafka в микросервисной архитектуре — лучшие практики асинхронного обмена». На нем рассмотрим подходы к организации обмена сообщениями в распределенных системах.

13 августа в 18:00 вы сможете ознакомиться с архитектурными паттернами работы с Kafka — на уроке обсудим методы построения устойчивых и масштабируемых решений.

20 августа в 20:00 пройдет урок «Kafka и Clickhouse — как организовать взаимодействие», посвященный интеграции Kafka с аналитической платформой Clickhouse.

А если вы настроены на серьезное обучение, рекомендуем ознакомиться с программой курса по Apache Kafka — на нём максимум практики по работе с Kafka для инженеров данных и разработчиков.

Комментарии (0)