В середине 2000-х с ростом популярности систем с микросервисной архитектурой, усилением массовости их использования обществом, а также с резким увеличением объёмов данных, генерируемых пользователями и другими системами, у компаний возникла потребность в эффективной обработке колоссальных объёмов информации. Им требовалась система, которая могла:

  • обрабатывать сотни тысяч сообщений в секунду;

  • обеспечивать низкую задержку;

  • быть отказоустойчивой и масштабируемой;

  • хранить данные длительное время.

Существующие на то время традиционные системы обмена сообщениями (например, RabbitMQ, ActiveMQ, Java Message Service) часто не справлялись с большой нагрузкой, так как не могли эффективно масштабироваться горизонтально, имели задержки при высокой нагрузке и не обеспечивали надёжное хранение сообщений.

В 2010 году инженеры LinkedIn Джей Крепс, Нихэд Говиндараджан и Джун Рао разработали Kafka как внутреннее решение. Но уже в 2011 году Kafka была открыта под лицензией Apache 2.0, а в 2012 стала топ‑уровневым проектом. С тех пор её популярность выросла — она стала стандартом для потоковой обработки (Kafka Streams, Flink, Spark Streaming), event‑driven архитектур, интеграции микросервисов, сбора данных для аналитики (ELT‑конвейеры).

Привет, Хабр! Я SDET‑инженер в SimbirSoft Александр, в этой статье я предлагаю вам:

  • Рассмотреть основы Kafka, ее архитектуру и как она работает.

  • Выяснить, как тестируются сообщения в топиках, какие инструменты для этого используются. Приведу примерные сценарии.

  • Обсудить роль Kafka в интеграционном тестировании, покажу пример интеграционного теста.

Материал будет полезен для новичков в области тестирования ПО, как ручного, так и автоматизированного.

Основы Аpache Kafka и принципы её работы

Apache Kafka — это система управления потоками данных, которая позволяет приложениям обмениваться сообщениями в реальном времени. Другими словами, это платформа, распределяющая поток данных, поступающих из разных источников, по разным другим сервисам внутри системы. Она предназначена для публикации, подписки, хранения и обработки информации (сообщений).

Основные составляющие Apache Kafka: 

  1. Producer — это производитель, который отправляет (публикует) данные в Kafka, а также определяет, в какой топик будут отправлены сообщения. Кроме того, поддерживает синхронную и асинхронную отправку сообщений, для которых использует различные схемы сериализации (например, JSON, Avro, Protobuf ). 

  2. Consumer — потребитель данных из топиков, читает или «подписывается» на сообщения из Kafka и десериализует их, то есть превращает байтовый поток в читаемый и обрабатываемый объект (строку, JSON, или же объект Java). Может подключаться к одному или нескольким топикам, входить в группу потребителей, где каждый экземпляр обрабатывает подмножество партиций (load balancing). Также он отслеживает offset — позицию, до которой прочитаны данные.

  3. Broker (брокер) — сервер Kafka, который хранит данные и обрабатывает запросы продюсеров и консьюмеров. Каждый брокер может обслуживать множество топиков и партиций.

  4. Cluster (кластер) — может состоять из одного или нескольких брокеров. Обеспечивает отказоустойчивость и масштабируемость путём сопоставления брокеров и партиций, определения лидера каждой партиций и какой из брокеров доступен или «живой», а также путём синхронизации реплик. Если брокеры не связаны в кластер — никто не будет знать, где находятся сообщения, какой брокер «главный» по каждому фрагменту данных, как балансировать нагрузку и тому подобное

    Кластер использует ZooKeeper (в старых версиях) или собственный внутренний KRaft для управления метаданными.

  5. ZooKeeper / KRaft Controller.

    • ZooKeeper (до версии 4.х) — внешняя система управления (то есть запускается отдельно), отслеживающая статус брокеров, управляющая выборами лидеров‑партиций и конфигурацией. 

    • KRaft (Kafka Raft Metadata mode) — новая архитектура Kafka без ZooKeeper, включающая встроенный мета‑контроллер на основе Raft. С версии 4.х в Apache Kafka режим KRaft становится единственным в работе без Zookeper.

  6. Topic (топик) — логическая категория или канал, в который пишут «производители» и из которого читают «потребители». Сообщения внутри топика организованы по партициям.

  7. Partition (партиция) — физическое разделение данных в пределах одного топика. Позволяет масштабировать Kafka горизонтально (по числу брокеров). Каждая партиция — это упорядоченный, неизменяемый лог событий (сообщений), а каждое сообщение внутри партиции имеет уникальный offset.

  8. Offset — порядковый номер сообщения в партиции. Используется для отслеживания позиции чтения потребителем и хранится в Kafka (в топике consumer_offsets) или во внешних системах. Отчёт начинается с 0, как в массивах.

  9. Replication (репликация). Kafka автоматически реплицирует данные между брокерами для обеспечения отказоустойчивости. Одна из реплик назначается лидером, остальные — фолловерами. Продюсеры и консьюмеры взаимодействуют только с лидером партиции.

  10. И наконец само сообщение в топике. Оно имеет чётко определённую структуру и может принимать разные форматы в зависимости от требований системы. Сообщение состоит из ключа, значения, заголовка, метки времени и позиции (присваивается брокером при записи). Ключ может использоваться для маршрутизации (например, на выходе обработка сообщения разная в зависимости от ключа), для группировки и для обеспечения идемпотентности. Значение содержит то, что вы реально передаёте: JSON, строка, байты и так далее. Headers (заголовки) необязательны и могут служить для передачи контекста, трассировки, типов данных и других метаданных.

Схема структуры перечисленных компонентов
Схема структуры перечисленных компонентов

Рассмотрим этапы движения сообщения в Apache Kafka «под капотом»:

  1. На начальном этапе продюсер формирует объект ProducerRecord<K, V> (он же объект сообщения). Он включает имя топика, ключ, тело или значение сообщения и (опционально) номер партиции, время события, заголовки. 

    Далее происходит сериализация ключа и значения в массив байтов с помощью Serializer. Потом KafkaProducer определяет, в какую партицию топика отправить сообщение (если не указан partition, то по умолчанию по хешу ключа или же рандомно). Сериализованное сообщение попадает в RecordAccumulator — внутренний буфер продюсера; он накапливает несколько сообщений перед отправкой — это позволяет эффективную передачу батчами (bulk). 

  2. Отправка сообщений в кластер. Фоновый поток Sender: периодически отправляет буферы (batch) в нужные брокеры, используя для этого KafkaClient (сетевой клиент) и NIO.

  3. Принятие сообщения брокером. Брокер (лидер нужной партиции) получает запись, сохраняет её на диск в log‑файл (append‑only), подтверждает получение продюсеру (если acks=1 или acks=all).

  4. Репликация (если задан replication.factor > 1). Лидер рассылает сообщение другим брокерам (репликам) и все синхронные реплики (в ISR) получают запись. Если acks=all, то продюсер ждёт подтверждения от всех реплик.

  5. Чтение сообщения консьюмером. Консьюмер подключается к Kafka через KafkaConsumer, подписывается на топик через subscribe(), получает список партиций, опрашивает Kafka poll(), чтобы получить записи. 

    Полученные байты десериализуются с помощью класса Deserializer, то есть данные преобразуются обратно в Java‑объекты или же в строку (это как вы настроите свою систему). Таким образом консьюмер получает объект ConsumerRecord<K, V>.

  6. Обработка сообщения и коммит офсета. Далее консьюмер обрабатывает сообщение (бизнес‑логика), коммитит текущий offset (вручную или автоматически). А offset сохраняется в Kafka (в __consumer_offsets), что позволяет продолжить с нужного места при перезапуске.

Тестирование сообщений в топиках

Зачем их тестировать?

Apache Kafka широко используется для асинхронного обмена данных между микросервисами. Несмотря на положительные стороны данной технологии, перечисленные ранее, она все же не идеальна, и работа с ней требует тщательного тестирования для обеспечения надёжности и корректности работы системы.

При отсутствии тестирования, неправильной настройке Kafka или разработке сервисов смогут возникнуть следующие проблемы:

  • Потеря сообщений (например, продюсер отправил сообщение, но консьюмер его не получил). Такая проблема может возникнуть по множеству причин: 

    • При отправке продюсер получил исключение (TimeoutException, RecordTooLargeException, UnknownTopicOrPartitionException) и, если в конфигурации acks=0, продюсер не ждёт подтверждения от брокера и может «не заметить» потерю сообщения.

    • Консьюмер читает другую партицию или находится в другой группе консюмеров.

    • Консьюмер фильтрует сообщения по ключу/значению, и они отбрасываются в логике приложения.

    • Консьюмер не имеет прав на чтение из топика. В логах брокера будет TOPIC_AUTHORIZATION_FAILED.

    • Сообщение уже удалено из топика. Если сообщение было прочитано другой группой давно, а retention времени истекло (настраиваемые параметры retention.ms,log.retention.bytes, delete.retention.ms), Kafka может уже удалить его.

  • Сообщение было отправлено в несуществующий топик. Если включена автоматическая генерация топиков (auto.create.topics.enable=true) — это может привести к неожиданному росту количества топиков (например, из‑за опечаток в именах), что крайне нежелательно в продакшене.

  • Дублирование сообщений. Консьюмер обработал сообщение, но не закоммитил офсет, и при рестарте прочитал его снова. Или, например, продюсер отправил сообщение дважды (например, из‑за retry‑логики).

  • Некорректная десериализация. Например, продюсер отправил JSON, а консьюмер ожидал Avro — ошибка парсинга. Или же само сообщение некорректно и сервис не может преобразовать его в свой объект.

  • Из‑за скопившихся лагов (необработанных сообщений) по причине сбоя при организации сервисов с синхронной обработкой, поступление дальнейших сообщений может быть заблокировано. 

На проде вышеперечисленные проблемы могут доставить компаниям большие финансовые и временные потери. Поэтому тестирование поможет их избежать.

Тестовые утилиты для тестирования

Существует несколько библиотек и инструментов как для ручного тестирования Kafka, так и для её автоматизации.

Для автоматизированного тестирования обычно используются такие библиотеки, как: 

  1. Embedded Kafka. Позволяет запускать Kafka в тестах без необходимости настройки полноценного кластера. Хорошо подходит для юнит‑ и интеграционных тестов внутри Spring.

  2. Testcontainers — библиотека для запуска контейнеров с Kafka, удобен для изоляции тестов и, чаще всего, подходит для интеграционного тестирования.

  3. Spring Kafka — поддержка тестов Kafka от Spring фреймворка.

  4. Apache Kafka Client — официальная библиотека, которая предоставляет классы KafkaProducer<K, V>, KafkaConsumer<K, V>, AdminClient для отправки и получения сообщений, а также для управления топиками и брокерами.

  5. Kafka‑junit — JUnit‑расширение для Kafka.

Для ручного тестирования применяются:

  1. Kafka Tool (Offset Explorer) — классическое desktop‑приложение, позволяющее просматривать топики, партиции, сообщения (отображать их в разных форматах), поддерживающее ручную отправку сообщений. Подходит для ОС Windows/Linux/Mac.

  2. AKHQ (ранее Kafka HQ) — web‑интерфейс для Kafka (деплоится как Docker или Spring Boot). Возможен просмотр топиков, consumer‑групп, оффсетов, позволяет редактировать и отправлять сообщения.

  3. Kafdrop — легковесный, простой open‑source UI для просмотра сообщений и топиков.

  4. Conduktor — коммерческий и бесплатный клиент для Kafka, имеет расширенный GUI для всех операций: продюсинг, консьюминг, ACL, схемы, мониторинг.

  5. CLI‑инструменты (терминальные): kcat (для чтения, записи, диагностики Kafka, отличен для отладки, скриптов, CI) и kafka‑console‑producer / kafka‑console‑consumer (простой, встроенный консольный скриптер, входит в дистрибутив Apache Kafka).

  6. Grafana + Prometheus + JMX Exporter для мониторинга производительности брокеров и продюсеров/консьюмеров.

Подготовка к тестированию

Прежде чем переходить к техническим аспектам, важно определить, что именно вы хотите протестировать:

  • Производительность Kafka (throughput, latency).

  • Надёжность доставки сообщений.

  • Поведение в случае отказов.

  • Интеграцию с другими компонентами и внешними микросервисами.

  • Корректность сериализации и десериализации сообщений.

От поставленных целей зависит набор инструментов и конфигурация окружения.

Допустим, нам нужно просто проверить, как работает отправка сообщения в топики и приходят ли они корректно. В качестве ОС выберем Linux.

Для начала стоит отметить, что Kafka без установки Java работать не будет, так как она разработана на этом языке и требует JVM для запуска. На новых проектах и продакшене лучше использовать комбинацию Java 17 + Kafka 3.x. Соотношение рекомендуемых версий приведу в таблице:

Версия Kafka

Поддерживаемые версии Java

Kafka 4.х

Java 11+

Kafka 3.х

Java 11+

Kafka 2.0–2.8

Java 8, 11

Kafka 1.х

Только Java 8

После установки Java следует установка Apache Kafka. Откроем терминал и пропишем следующее:

wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13–3.6.0.tgz

mkdir /opt/kafka — создадим каталог, куда установим кафку

tar ‑zxf kafka_2.13–3.6.0.tgz ‑C /opt/kafka ‑strip 1 — распакуем скачанный архив в созданный каталог

Можно открыть конфигурационный файл:

vi /opt/kafka/config/server.properties

и внести для себя нужные изменения, например, могут быть полезны следующие параметры:

  • delete.topic.enable = true— данная директива разрешает ручное удаление топика из кафки;

  • log.retention.hours — время хранения сообщений в часах

  • log.retention.bytes — или по размеру в байтах (чтобы не копить мусор в виде выходных тестовых данных).

Более широкую информацию о конфигах можно посмотреть в документации Kafka.

Давайте запустим Kafka и отправим первое сообщение. 

  1. Первым делом запускаем сам сервис:

/opt/kafka/bin/kafka‑server‑start.sh config/server.properties

Если установили Kafka версии ниже 2.8, то требуется запустить отдельно Zookeeper:

/opt/kafka/bin/zookeeper‑server‑start.sh config/zookeeper.properties

  1. Создадим топик (тему):

/opt/kafka/bin/kafka‑topics.sh ‑create ‑bootstrap‑server localhost:9092 ‑replication‑factor 1 ‑partitions 1 ‑topic Test

  1. Отправляем сообщение брокеру:

echo «Сообщение для Кафки» | /opt/kafka/bin/kafka‑console‑producer.sh ‑broker‑list localhost:9092 ‑topic Test

  1. Достаем сообщение:

/opt/kafka/bin/kafka‑console‑consumer.sh ‑bootstrap‑server localhost:9092 ‑topic Test ‑from‑beginning

(опция from‑beginning позволяет увидеть все сообщения, которые были отправлены в брокер до создания подписчика)

В консоли мы должны увидеть: Сообщение для Кафки.

Для удобства проверки сообщений в топиках можно использовать Kafdrop. Установка Kafdrop на Linux выполняется запуском JAR‑файла через Java (JDK 11+ или 17+), предварительно указав адрес брокера Kafka. Самый быстрый способ — скачать готовый релиз с GitHub и запустить через java ‑jar, либо использовать Docker. Веб‑интерфейс будет доступен по умолчанию на порту 9000.

После установки JDK скачайте JAR‑файл (на момент написания статьи последняя версия — 4.2.0):

curl ‑L ‑o kafdrop.jar https://github.com/obsidiandynamics/kafdrop/releases/download/4.2.0/kafdrop-4.2.0.jar

Далее запустите приложение, указав адрес вашего Kafka‑брокера (<host>:<port>):

java ‑jar kafdrop.jar ‑kafka.brokerConnect=<broker‑host:port>

К примеру, можно указать ‑kafka.brokerConnect=localhost:9092. Далее откройте браузер и перейдите по адресу: http://localhost:9000. Теперь вы можете просматривать список активных узлов, их ID, хосты, порты, списки топиков и партиций, потребителей и их лаги, а также сами сообщения.

Тестирование продюсеров и консьюмеров через автотесты

В данном разделе рассмотрим, как написать простые автотесты для проверки сообщений в топиках, использовав связку инструментов: Java 11+, Maven, Testcontainers, JUnit5.

Чтобы Testcontainers работал, нам необходимо установить Docker:

  1. Обновим пакеты и установим зависимости:

sudo apt update

sudo apt upgrade ‑y

sudo apt install ‑y ca‑certificates curl gnupg lsb‑release

  1. Добавим Docker GPG ключ:

sudo mkdir ‑p /etc/apt/keyrings

curl ‑fsSL https://download.docker.com/linux/ubuntu/gpg | \

sudo gpg ‑dearmor ‑o /etc/apt/keyrings/docker.gpg

  1. Добавим Docker репозиторий:

echo \

“deb [arch=$(dpkg ‑print‑architecture) signed‑by=/etc/apt/keyrings/docker.gpg] \”

https://download.docker.com/linux/ubuntu \

$(lsb_release ‑cs) stable’ | \

sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

  1. Установка Docker Engine и проверка установки:

sudo apt update

sudo apt install ‑y docker‑ce docker‑ce‑cli containerd.io docker‑buildx‑plugin docker‑compose‑plugin

sudo docker run hello‑world

Создаем новый maven‑проект через среду разработки. 

Чтобы подтянуть все нужные библиотеки, необходимо добавить зависимости в файл pom.xml, как показано на скрине:

Наконец, приступим к написанию самого кода. 

Напишем класс продюсера:

Для него мы добавим конструктор, внутри которого будет инициализироваться и конфигурироваться объект продюсера. А также добавим методы send(String message) для отправки сообщения и close()для завершения работы продюсера и освобождения всех его ресурсов.

Очевидно, теперь нужно написать класс и для консьюмера:

Тут практически то же самое, только для консьюмера сообщения будут вытягиваться из топиков.

Обратите внимание на параметр AUTO_OFFSET_RESET_CONFIG. Значение latest означает, что консьюмер будет читать сообщения только после своего подключения, игнорируя все сообщения, которые были отправлены до его запуска. Поэтому в этом случае важно реализовать тесты так, чтобы сообщения в топик отправлялись после инициализации и подписки консьюмера на топик (метод subscribe()). Можно также задать значение earliest, тогда консьюмер будет читать сообщения с начала топика, включая те, что были отправлены до его запуска.

Ниже представлен родительский класс, где поднимается и стартует Кафка‑контейнер, и от которого потом наследуется тестовый класс (где будут прописаны автотесты). Метод kafkaContainer.getBootstrapServers() нам даст адрес сервера, под которым будет поднят кластер Кафки. В @BeforeAll методе пропишем старт контейнера и инициализацию объектов продюсера и консьюмера, чтобы к старту тестов они были уже готовы к работе. Также добавим @AfterAll метод для их закрытия и остановки контейнера, чтобы не занимал память после тестов.

Далее напишем класс с тестом, отправляющим и проверяющим сообщение в топике:

В тесте testSendAndReceiveMessage() уже инициализированный продюсер отправляет сообщение, а консьюмер принимает его. И исходное и полученное сообщения сравниваются между собой. Но такая реализация не обеспечит нам должной проверки, так как сообщение может быть не доставлено по причине таймаута ожидания сообщения. Поэтому лучше добавить явное ожидание для гарантии получения сообщения. Например, можно воспользоваться библиотекой Awaitility. Добавляем зависимость и модифицируем наш тест: 

Теперь мы явно ожидаем появления сообщения в топике, и, если оно не придёт — тест упадёт. Объект AtomicReference также поможет нам при асинхронной отправке сообщения.

В данном примере показана лишь самая простая реализация теста с использованием библиотеки TestContainers, позволяющей запускать тесты изолировано. Этот пример наглядно демонстрирует работу Kafka, но для полноценного интеграционного тестирования он не подойдёт. Для удовлетворения такого тестирования необходимо модифицировать такой код так, чтобы соблюдались следующие правила:

  1. Должна присутствовать проверка бизнес логики. 

    Необходимо использовать сервисы из тестируемого приложения. Интеграционный тест обычно проверяет, что ваш слой бизнес‑логики, сервисы и Kafka взаимодействуют правильно.

  2. Присутствие настроек и проверок сериализации/десериализации объектов. Часто приложения используют сложные форматы (JSON, Avro, Protobuf) и кастомные сериализаторы, поэтому тест, где проверяются просто строки, не покрывает эту часть. 

    Особую популярность набирает механизм сериализации Protobuf в силу легковесности и быстроты. Данные преобразуются в бинарный формат с использованием числовых тегов, а не текстовых имён, что значительно экономит место. Однако, в отличие от JSON, данные Protobuf невозможно прочитать без файла схемы (формата.proto), где описывается структура данных, поэтому нельзя просто сериализовать объект, не описав его заранее.

  3. Покрытие асинхронного взаимодействия. Тесты должны отражать реальный асинхронный поток сообщений, должны быть проверки обработки ошибок, временных задержек, повторных попыток.

    Интеграционное тестирование в контексте Kafka

Роль Kafka в интеграционном тестировании

Можно считать, что Apache Kafka стала де‑факто стандартом для асинхронного обмена сообщениями. Тем самым она в некоторой степени изменила подход к тестированию распределенных систем, микросервисов и event‑driven архитектур. Появилось несколько важных аспектов, которые нужно учитывать при тестировании Kafka‑ориентированных приложений:

  • если сервисы и БД работали в строгой согласованности и данные можно было проверять сразу после операции, то сейчас данные могут обновляться с задержкой. Необходимо учитывать задержки репликации и обработки, и потом проверять, что система (не только сервис) приходит в ожидаемое состояние;

  • из‑за возможной ребалансировки партиций и повторной доставки сообщений появилась необходимость уделять внимание на тестирование идемпотентности обработчиков (когда повторное применение события не ломает систему);

  • интеграционное тестирование стало сложнее и разностороннее, увеличилось число тестируемых элементов (БД, «выходные» топики, дополнительные запросы в сторонние сервисы и тд). 

При тестировании системы, работающей с Kafka, тестирование лишь отдельных сервисов недостаточно — обязательно должны быть проверки их взаимодействия, а иногда они более приоритетны и должны быть первоначальны. 

Например, Сервис-1 и Сервис-2 с установленными моками отдельно работают отлично. Но сообща они работают некорректно или с ошибками (допустим, на одной стороне ошибка парсинга или десериализации сообщения из‑за неправильно реализованной его модели; или первый не получает от второго нужной доп информации и сохраняет в БД неправильные значения). На этом уровне затраты на исправления дефектов могут значительно возрасти, поскольку, может потребоваться пересмотр аналитики, контрактов между сервисами и передел логики обработки сообщений, а не просто «фикс» на одном лишь сервисе.

Пример интеграционных тестов, проверка обработки сообщений в реальном времени

Рассмотрим интеграционные тесты на примере Kafka‑приложения. Допустим, дано приложение, предоставляющее услуги по аренде самокатов по городу, которое работает на Spring, и работающее по схеме ниже:

Цепочка взаимодействия выглядит так:

  1. Пользователь через UI‑клиент отправляет HTTP‑запрос на аренду. 

  2. API Service преобразовывает тело запроса в kafka‑сообщение и отправляет в топик rent.requests.topic. 

  3. Далее сообщение из топика читает Rental Service, который создает в БД запись об аренде, проводит валидацию (скажем, проверяет не занят ли транспорт, доступен ли самокат для клиента по уровню), блокирует некую залоговую сумму (условно 500 рублей) на счете, отправляет результаты обработки в rent.events (успешная бронь или же ошибка валидации). Если все успешно, обновляет статус аренды в БД на «confirmed».

При старте аренды движение сообщения такое же, но статусы аренды и транспорта другие. А при завершении Rental Service производит нужные вычисления по счету, меняет статус транспорта на «free» и статус аренды на «finished». 

Отбросим детализацию организационных и бизнес процессов и перейдём сразу к интеграционному тесту. Так как мы не учитываем все аспекты бизнес‑логики, не будем формировать целый чеклист проверок, а выделим абстрактно один полный позитивный тест аренды транспорта.

При ручном тестировании тест‑кейс выглядел бы так:

Тема

Начало и завершение аренды самоката. Позитивный кейс

Приоритет

Высокий

Окружение

тестовое окружение

Предусловия

Внести или подготовить в БД необходимые сущности (пользователь, счет, транспорт). Цена транспорта 0,08 рублей в секунду. Пользователь авторизован

Шаг-1

Выбрать самокат. Нажать на кнопку «Забронировать»

Ожидаемый результат 1

Проверить топик «rent.requests.topic» — появилась запись с запросом на аренду, ключ сообщения равен «RESERVE».

Проверить топик «rent.events» — появилась запись об успешной обработке запроса бронирования со статусом Success. 

Проверить таблицы в БД — статус самоката «reserved». В таблице поездок создалась новая запись аренды со статусом «confirmed».

В приложении

Шаг-2

Нажать на кнопку «Начать поездку»

Ожидаемый результат 2

Проверить топик «rent.requests.topic» — появилась запись с запросом на старт аренды, ключ сообщения равен «START».

Проверить топик «rent.events» — появилась запись об успешной обработке запроса старта поездки со статусом Success.

Проверить таблицы в БД — статус самоката «in_use», в таблице счета появилась сумма в поле «frozen_sum» на сумму удержки в таблице поездок в записи статус поменялся на «active»

Шаг-3

Подождать 5 секунд и нажать на кнопку «Завершить поездку»

Ожидаемый результат 3

Проверить топик «rent.requests.topic» — появилась запись с запросом на завершение аренды, ключ сообщения равен «FINISH»..

Проверить топик «rent.events» — появилась запись о завершении поездки со статусом Success. 

Проверить таблицы в БД — статус самоката поменялся на «free», в таблице поездок в записи поменялся статус на «finished», и счет пользователя уменьшился на стоимость поездки.

Имитируя действия реального пользователя, данный кейс, по‑сути, является E2E‑тестом и показывает, что все части системы работают вместе как единое целое без критических ошибок. 

Посмотрим, как может выглядеть данный кейс в автотестах. UI‑автотест тут будет сложно показать ввиду абстрактности, но будет наглядно продемонстрировать его через API‑тесты. Также, хотелось подметить, что рекомендуется весь тест‑кейс разделить на несколько автотестов по бронированию, старту и завершению аренды. 

Подготовим окружение так, как было описано в предыдущем разделе, только нужно добавить нужные топики, поднять дополнительно контейнер БД для работы с ним, и привязать это к нашему приложению через Spring Boot, чтобы тест запускался в контексте c поднятыми сервисами. 

Консьюмер нужно переделать так, чтобы он работал в условиях асинхронной работы

public class TestKafkaConsumer {
   private Consumer<String, String> consumer;
   private final AtomicBoolean pollThreadFlag = new AtomicBoolean(false);


   private List<ConsumerRecord<String, String>> allRecords = Collections.synchronizedList(new ArrayList<>());
   private final Logger log = LoggerFactory.getLogger(TestKafkaConsumer.class);
   private ConsumerThread consumerThread;




   public TestKafkaConsumer(String bootstrapServers, List<String> topics, String groupId) {
       Properties consumerProps = new Properties();
       consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
       consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
       consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");


       this.consumer = new KafkaConsumer<>(consumerProps);
       this.consumer.subscribe(topics);


       consumerThread = new ConsumerThread();
       pollThreadFlag.set(true);
       consumerThread.start();
   }


   public void pollMessages() {
       ConsumerRecords<String, String> pulledRecords = consumer.poll(Duration.ofMillis(500));
       for(ConsumerRecord<String, String> record: pulledRecords) {
           allRecords.add(record);
           log.info(
                   "Получено kafka-сообщение:\n\tТопик: {}\n\tКлюч: {}\n\tСообщение: {}\n\tХедеры: {}\n\n",
                   record.topic(),
                   record.key(),
                   record.value(),
                   record.headers()
           );
       }
   }


   public void clearTopicHistory() {
       allRecords.clear();
   }


   public void shutDownPollThread() {
       pollThreadFlag.set(false);
       consumer.wakeup();
   }


   public void close() {
       consumer.close();
   }


   public synchronized  List<ConsumerRecord<String, String>> getMessagesFromTopicByKey(
           String topic,
           String key,
           long timeOutMilliseconds,
           long interval
   ) {
       List<ConsumerRecord<String, String>> topicRecords = new ArrayList<>();
       await()
               .atMost(Duration.ofMillis(timeOutMilliseconds))
               .pollInterval(Duration.ofMillis(interval))
               .pollDelay(Duration.ofMillis(timeOutMilliseconds))
               .until( () -> {
                   synchronized (allRecords) {
                       topicRecords.addAll(allRecords
                               .stream()
                               .filter(record -> record.topic().equals(topic) && record.key().equals(key))
                               .toList());
                       return !topicRecords.isEmpty();
                   }


                   }
               );
       return topicRecords;
   }


   class ConsumerThread extends Thread {


       @Override
       public void run() {
           try {
               while (pollThreadFlag.get()) {
                   pollMessages();
               }
           } catch (WakeupException e) {
               if (!pollThreadFlag.get()) {
                   log.info("Consumer wakeup вызван, завершаем поток");
               } else {
                   log.warn("Неожиданный WakeupException при работающем флаге");
               }
           }
       }


   }
}

Метод pollMessages() считывает все записи, приходящие в топики, и помещает их в отдельный список allRecords для дальнейших операций (фильтрация, сравнение и тд ) с ними, а также логирует полученные данные.

Для получения сообщений по конкретному топику и ключу используется метод getMessagesFromTopicByKey(). В нем ожидается, пока в списке allRecords не появятся нужные сообщения.

В конструкторе самого обёртка консьюмера происходит его настройка, инициализация и запуск чтения сообщений в отдельном потоке ConsumerThread, который управляется флагом pollThreadFlag. Остановка данного потока происходит при вызове метода shutDownPollThread, который ставит значение флага в false, тем самым останавливая чтение.

Метод clearTopicHistory() будет очищать список сообщений после каждого теста в методе, помеченном аннотацией @BeforeEach.

Кроме всего этого добавим инструменты для сравнения результатов в БД и в Kafka

package org.example.tools;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Assertions;


import java.util.List;


public class KafkaAssertTool {


   public static void assertKafkaMessage(
           String topic,
           String expectedKey,
           String expectedValue,
           long timeOutMilliseconds,
           long interval,
           TestKafkaConsumer consumer
   ) {
       List<ConsumerRecord<String, String>> actualList = consumer
               .getMessagesFromTopicByKey(topic, expectedKey, timeOutMilliseconds, interval);


       //Предполагается, что по определенному ключу и топику будет только одна запись
       Assertions.assertEquals(
               1,
               actualList.size(),
               "Ожидалось 1 сообщение в топике \"" + topic + "\" с ключом " + expectedKey +
                       ". Реальное количество: " +  actualList.size());


       ConsumerRecord<String, String> actualRecord = actualList.getFirst();


       Assertions.assertEquals(
                       expectedValue, actualRecord.value(),
                       "Значение сообщения не соответствует ожидаемому: актуальное значение - "      + actualRecord.value()
               );
   }
}
public class PostgreSQLAssertTool {


  public static void assertAccountEntity(Account expectedAccount, AccountRepository accRepository) {
      Account actualAccount = Assertions.assertDoesNotThrow(
              () -> accRepository.findById(expectedAccount.getId()).orElseThrow(),
              "Не найден счёт пользователя с id: " + expectedAccount.getId()
      );


      Assertions.assertAll(
              () -> Assertions.assertEquals(actualAccount.getUserId(), expectedAccount.getUserId()),
              () -> Assertions.assertEquals(actualAccount.getTotalSum(), expectedAccount.getTotalSum()),
              () -> Assertions.assertEquals(actualAccount.getFrozenSum(), expectedAccount.getFrozenSum())
      );
  }
}

Для PostgreSQLAssertTool показан метод для сравнения актуального и ожидаемых результатов только для сущности счета пользователя. Для других сущностей можно добавить аналогичные методы с соответствующими передаваемыми параметрами. А можно и оставить и один такой метод для всех сущностей, но для этого нужно сначала абстрагировать в общий класс и сущности и репозитории сущностей. 

Итак, как будут выглядеть сами автотесты:

public class IntegrationTest extends BaseTest {


   @Autowired
   AccountRepository accountRepository;
   @Autowired
   UserRepository userRepository;
   @Autowired
   TransportRepository transportRepository;
   @Autowired
   RentalRepository rentalRepository;


   private ZoneOffset zone = ZoneOffset.UTC;


   private User user = new User(1234L, "Евгений", "user@mail.ru");
   private LocalDateTime startDateTime = LocalDateTime.ofInstant(Instant.now(), zone);
   private Transport transport = new Transport(1L, "Самокат", TransportStatus.FREE, 0.08);
   private Account account = new Account(4321L, user.getId(), 1000.0, 0.0);
   private Long testRentalId = 1001L;
   private Rental rentalEntity = new Rental(
           testRentalId,
           user.getId(),
           transport.getId(),
           startDateTime,
           RentalStatus.CONFIRMED
   );


   @BeforeEach
   public void saveUser() {
       userRepository.save(user);
   }


   @Test
   @Order(1)
   @DisplayName("Проверка бронирования транспорта")
   public void reservePositiveIntegrationTest() throws JsonProcessingException {
   //Предусловие: подготовить в БД необходимые сущности
       accountRepository.save(account);
       transportRepository.save(transport);


       RentRequestDTO rentRequest = new RentRequestDTO(
               testRentalId, user.getId(), transport.getId(), startDateTime, "RESERVE"
       );
       String requestBody = objectMapper.writeValueAsString(rentRequest);


   //Отправка запроса на аренду
       given().baseUri("http://localhost:8080")
               .header("Authorization", "Bearer token")
               .body(requestBody)
       .when().post("/api/reserve")
               .then()
               .statusCode(200);


   //Ожидаемый результат
       //Проверка записи аренды, транспорта и счета в БД
       PostgreSQLAssertTool.assertRentalEntity(rentalEntity, rentalRepository);
       PostgreSQLAssertTool.assertTransportEntity(transport.setStatus(TransportStatus.RESERVED), transportRepository);
       PostgreSQLAssertTool.assertAccountEntity(account.setFrozenSum(500.0), accountRepository);
       //Проверка записи в топике rent.requests.topic
       KafkaAssertTool.assertKafkaMessage(
               TestTopics.RENT_REQUESTS.name(),
               "RESERVE",
               requestBody,
               5000L,
               500L,
               consumer
       );
       //Проверка записи в топике rent.events.topic
       RentEventDTO rentEvent = new RentEventDTO(
           testRentalId, user.getId(), transport.getId(), startDateTime, RentalStage.RESERVE, true, List.of()
       );
       String rentEventString = objectMapper.writeValueAsString(rentEvent);
       KafkaAssertTool.assertKafkaMessage(
               TestTopics.RENT_EVENTS.name(),
               "RESERVE",
               rentEventString,
               5000L,
               500L,
               consumer
       );
   }

Данный тест условно проверяет правильность работы бронирования транспорта.

Используя RestAssured, посылается запрос на бронирование и далее происходит проверка записей в топиках rent.request.topic и rent.events.topic, а также правильность заполнения сущностей в базе данных. Kafka‑сообщения формируются в результате сериализации ДТО‑объектов запросов аренды в строку с помощью библиотеки Jackson.

Конечно же, рекомендуется использовать свои тестовые инструменты для сохранения сущностей в БД, а не использовать репозитории разработчиков, созданные в контексте Spring, поскольку мы можем перенять возможные баги, скрывающиеся в них

@Test
@Order(2)
@DisplayName("Проверка старта аренды транспорта")
public void startRentPositiveIntegrationTest() throws JsonProcessingException {
   LocalDateTime requestDateTime = LocalDateTime.ofInstant(Instant.now(), zone);
//Предусловие: подготовить в БД необходимые сущности
   accountRepository.save(account.setFrozenSum(500.0));
   transportRepository.save(transport.setStatus(TransportStatus.RESERVED));
   rentalRepository.save(rentalEntity.setStatus(RentalStatus.CONFIRMED).setStartTime(startDateTime));


   String stringStartRequest = objectMapper.writeValueAsString(
       new RentRequestDTO(
          testRentalId, user.getId(), transport.getId(), requestDateTime, "START"
       )
   );


//Отправка запроса на старт аренды
   given().baseUri("http://localhost:8080")
           .header("Authorization", "Bearer token")
           .body(stringStartRequest)
           .when().post("/api/start")
           .then()
           .statusCode(200);


//Ожидаемый результат
   //Проверка записи аренды, транспорта и счета в БД
   PostgreSQLAssertTool.assertRentalEntity(
           rentalEntity.setStatus(RentalStatus.ACTIVE),
           rentalRepository
   );
   PostgreSQLAssertTool.assertTransportEntity(transport.setStatus(TransportStatus.IN_USE), transportRepository);
   PostgreSQLAssertTool.assertAccountEntity(account.setFrozenSum(500.0), accountRepository);
   //Проверка записи в топике rent.requests.topic
   KafkaAssertTool.assertKafkaMessage(
           TestTopics.RENT_REQUESTS.name(),
           "START",
           stringStartRequest,
           5000L,
           500L,
           consumer
   );
   //Проверка записи в топике rent.events.topic
   RentEventDTO rentEvent = new RentEventDTO(
           testRentalId, user.getId(), transport.getId(), requestDateTime, RentalStage.RENT, true, List.of()
   );


   KafkaAssertTool.assertKafkaMessage(
           TestTopics.RENT_EVENTS.name(),
           "START",
           objectMapper.writeValueAsString(rentEvent),
           5000L,
           500L,
           consumer
   );
}

В данном тесте проверяется старт аренды. В отличие от предыдущего теста, тут лишь подготавливаются сущности и ДТО объектами (RentRequestDTO, Rental, Transport, Account) с уже другими статусами, получаемыми как после бронирования транспорта, и сравниваются сообщения в тех же топиках, но с другими ключами.

Можно заметить, что тесты помечаются аннотацией @Order для установки порядка запуска тестов. Это сделано с целью использовать в этих тестах одни и те же объекты и сущности, чтобы не создавать их заново в каждом тесте. Просто их параметры будут меняться сеттерами, возвращающими тот же объект:

@Test
@Order(3)
@DisplayName("Проверка завершение аренды транспорта")
public void finishRentPositiveIntegrationTest() throws JsonProcessingException {
   LocalDateTime requestDateTime = LocalDateTime.ofInstant(Instant.now(), zone);
//Предусловие: подготовить в БД сущности
   accountRepository.save(account.setFrozenSum(500.0));
   transportRepository.save(transport.setStatus(TransportStatus.IN_USE));
   rentalRepository.save(rentalEntity.setStatus(RentalStatus.ACTIVE).setStartTime(startDateTime));


   //Отправка запроса на завершение аренды
   String stringStartRequest = objectMapper.writeValueAsString(
           new RentRequestDTO(
                   testRentalId, user.getId(), transport.getId(), requestDateTime, "FINISH"
           )
   );
   given().baseUri("http://localhost:8080")
           .header("Authorization", "Bearer token")
           .body(stringStartRequest)
           .when().post("/api/finish")
           .then()
           .statusCode(200);


//Ожидаемый результат
   //Проверка записи аренды, транспорта и счета в БД
   PostgreSQLAssertTool.assertRentalEntity(
           rentalEntity.setStatus(RentalStatus.FINISHED),
           rentalRepository
   );
   PostgreSQLAssertTool.assertTransportEntity(transport.setStatus(TransportStatus.FREE), transportRepository);
   double rentalSum = ( requestDateTime.toEpochSecond(zone) - rentalEntity.getStartTime().toEpochSecond(zone) )
           * transport.getPricePerSecond();
   PostgreSQLAssertTool.assertAccountEntity(account.setFrozenSum(0.0).setTotalSum(rentalSum), accountRepository);
   //Проверка записи в топике rent.requests.topic
   KafkaAssertTool.assertKafkaMessage(
           TestTopics.RENT_REQUESTS.name(),
           "FINISH",
           stringStartRequest,
           5000L,
           500L,
           consumer
   );
   //Проверка записи в топике rent.events.topic
   RentEventDTO rentEvent = new RentEventDTO(
           testRentalId, user.getId(), transport.getId(), requestDateTime, RentalStage.FINISHING, true, List.of()
   );


   KafkaAssertTool.assertKafkaMessage(
           TestTopics.RENT_EVENTS.name(),
           "FINISH",
           objectMapper.writeValueAsString(rentEvent),
           5000L,
           500L,
           consumer
   );
}

И наконец тест с проверкой завершения бронирования. Тут также различие в свойствах сущностей и ДТО, и кроме того, рассчитывается стоимость аренды путём разницы времени старта аренды, которое было получено при инициализации тестового класса, и временем запроса на финиш в текущем тесте.

Вышеизложенные тесты показывают нам, как работают в связке между собой сервисы аренды, общаясь посредством Кафки, во взаимодействии с БД.

Как избежать распространенных ошибок

Тестирование систем, работающих на основе Kafka, не такая простая задача. 

Часто, на начальном этапе, когда только формируется тестовый фреймворк и строятся процессы интеграционного тестирования, все это сопровождается ошибочными ситуациями. 

Давайте выделим самые распространённые из них, а также их решения.

  1. Неправильная настройка изоляции тестов.

    В условиях асинхронной работы системы есть огромный риск, что выходные данные и состояния системы после тестов будут перемешиваться, например, когда тесты влияют друг на друга через общие топики Kafka. И тогда тесты будут падать не пой той причине, которой требуется. В такой ситуации тестировщик может ошибочно сделать выводы о багах и натворить делов похуже. Решением данной проблемы послужит очистка всех топиков после каждого теста, использование разных групп консьюмеров. Кроме того, если вы настраиваете работу Kafka вручную, следует сдвигать позицию чтения топика в конец после каждого теста, иначе консьюмер будет заново читать записи из предыдущих тестов. Использование инструмента TestContainers также позволит изолировать тесты.

  2. Тесты нестабильны из‑за игнорирования временных задержек.

    Сообщения в топике могут приходить с задержкой, из‑за этого может упасть ассерт с отсутствием записи в топике. Иногда не помогают даже неявные ожидания. Для решения этой проблемы следует явно ожидать появления сообщений в топике. В этом поможет инструмент Awaitility.

  3. Не учтены «негативные» проверки в позитивных кейсах. Например, когда мы не проверяем отсутствие записей там, где они не должны быть, а они могут присутствовать. Такая ситуация даёт ложное представление о правильности работы системы. Важно проверять все моменты бизнес‑логики, проверять отсутствие сообщений в топике, если это предусмотрено требованием.

  4. Тесты проходят локально, но падают в CI/CD.

    Из‑за неправильно настроенной retry политики и таймаутов в удаленной системе сборки, интеграции и доставки по причине неких сбоев могут падать тесты. В таком случае можно добавить ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG или RECONNECT_BACKOFF_MAX_MS_CONFIG — это параметр конфигурации Kafka Consumer, который определяет базовую задержку перед повторной попыткой подключения к брокеру Kafka при разрыве соединения или неудачной попытке подключения. 

  5. Тесты падают на стадии сериализации/десериализации данных.

    Для таких сложных тестов нужны дополнительное логирование и расстановка правильных аннотаций для определения, например, nullable свойств ДТО.

Подведем итоги

Kafka выступает главной шиной обмена информации для современных распределенных приложений, и её корректная работа критически важна для бизнеса. Интеграционное тестирование систем на основе Kafka — это крайняя необходимость, если мы хотим минимизировать затраты на разработку систем на её основе.

Тесты Kafka дают достоверную информацию о работоспособности системы, так как интеграционные и компонентные тесты выявляют проблемы, которые unit‑тесты пропускают.

Вот несколько рекомендаций, которые помогут писать эффективные тесты:

  1. Используйте Testcontainers для реалистичного тестирования. Данный инструмент позволит запускать тесты изолированно, а также поможет решить проблемы с параллельным запуском тестов за счёт отдельных контейнеров со «своими» необходимыми для каждого теста и группы тестов данных. Однако следует учесть, что параллельный запуск в таких условиях требует больше технологических ресурсов.

  2. Реализуйте явное, более корректное ожидание с помощью Awaitility вместо Thread.sleep().

  3. Тестируйте edge cases: обработку ошибок, ретри, дубликаты сообщений

  4. Очищайте состояние между тестами, связанными с одним контейнером для обеспечения максимальной изоляции.

Помните: хорошо написанные интеграционные тесты для Kafka — это инвестиция в стабильность и надежность вашей событийно‑ориентированной архитектуры. Они окупаются многократно, предотвращая инциденты в продакшен и уменьшая время на отладку сложных проблем.

Подписывайся на наши соцсети и блог, где мы публикуем другие полезные материалы, в том числе и для SDET‑специалистов:

Комментарии (0)