Данная статья предлагает подход к написанию интеграционных тестов для приложений с Kafka, позволяющий сосредоточиться на спецификации взаимодействия, делая тесты более читаемыми и легкими для поддержки. Представленный подход не только повышает эффективность тестирования, но и способствует лучшему пониманию интеграционных процессов в приложении.
Статья опирается на три идеи, изложенные в соответствующих статьях: написание тестов с четким разделением на отдельные этапы Arrange-Act-Assert, изоляция в тестах с Kafka и использование инструментов повышения наглядности тестов. Рекомендую ознакомиться с ними перед погружением в материал данной статьи.
Демонстрационный сценарий
Возьмем в качестве примера телеграм-бот, который перенаправляет запросы к OpenAI API и возвращает результат пользователю. Если запрос в OpenAI нарушает правила безопасности системы, клиент получит об этом сообщение. Дополнительно будет отправлено сообщение в Kafka для системы поведенческого контроля, чтобы товарищ менеджер мог связаться с пользователем и объяснить, что его запрос оказался слишком пикантным даже для нашего робота, и попросить его пересмотреть свои предпочтения.
Контракты взаимодействия с сервисами описаны в упрощенном виде, чтобы подчеркнуть основную логику работы. Ниже приведена диаграмма последовательностей, демонстрирующая архитектуру приложения. Понимаю, что дизайн может вызвать вопросы с точки зрения системной архитектуры, но прошу отнестись к этому с пониманием — главная цель здесь продемонстрировать подход к написанию тестов.
Объект захвата сообщений
В качестве основного инструмента тестирования будет использоваться объект захвата сообщений — RecordCaptor. Он по своей сути работы очень похож на объект захвата исходящих запросов — RequestCaptor, о котором можно почитать в статье Разносим по полочкам этапы тестирования http запросов в Spring.
Захват сообщений будет выполняться через стандартный консумер Kafka. Список топиков нужно указывать явно через параметр конфигурации.
@KafkaListener(id = "recordCaptor", topics = "#{'${test.record-captor.topics}'.split(',')}", groupId = "test")
public void eventCaptorListener(ConsumerRecord<Object, Object> record,
@Headers Map<String, Object> boundedHeaders) {
RecordSnapshot recordSnapshot = mapper.recordToSnapshot(record, boundedHeaders);
recordCaptor.capture(recordSnapshot);
}
Объект RecordCaptor
накапливает информацию из захваченных сообщений. Использование подобного подхода требует соблюдения изоляции в тестах с Kafka. Ожидание подтверждения смещения перед проверкой результатов теста необходимо осуществить с помощью метода KafkaSupport#waitForPartitionOffsetCommit.
Пример теста
Ниже представлен код теста для описанного сценария.
def "User Message Processing with OpenAI"() {
setup:
KafkaSupport.waitForPartitionAssignment(applicationContext) // 1
and: // 2
def openaiRequestCaptor = restExpectation.openai.completions(withBadRequest().contentType(APPLICATION_JSON)
.body("""{
"error": {
"code": "content_policy_violation",
"message": "Your request was rejected as a result of our safety system."
}
}"""))
def telegramRequestCaptor = restExpectation.telegram.sendMessage(withSuccess('{}', APPLICATION_JSON))
when:
mockMvc.perform(post("/telegram/webhook") // 3
.contentType(APPLICATION_JSON_VALUE)
.content("""{
"message": {
"from": {
"id": 10000000
},
"chat": {
"id": 20000000
},
"text": "Hello!"
}
}""".toString())
.accept(APPLICATION_JSON_VALUE))
.andExpect(status().isOk())
KafkaSupport.waitForPartitionOffsetCommit(applicationContext) // 4
then:
openaiRequestCaptor.times == 1 // 5
JSONAssert.assertEquals("""{
"content": "Hello!"
}""", openaiRequestCaptor.bodyString, false)
and:
telegramRequestCaptor.times == 1
JSONAssert.assertEquals("""{
"chatId": "20000000",
"text": "Your request was rejected as a result of our safety system."
}""", telegramRequestCaptor.bodyString, false)
when: // 6
def message = recordCaptor.getRecords("topicC", "20000000").last
then:
message != null
JSONAssert.assertEquals("""{
"webhookMessage": {
"message": {
"chat": {
"id": "20000000"
},
"text": "Hello!"
}
},
"error": {
"code": "content_policy_violation",
"message": "Your request was rejected as a result of our safety system."
}
}""", message.value as String, false)
}
Ключевые шаги:
Ожидание назначения партиций до начала тестового сценария.
Мокирование запросов к OpenAI и Telegram.
Выполнение тестового сценария.
Ожидание подтверждения смещения.
Проверка запросов к OpenAI и Telegram.
Проверка сообщения в Kafka.
Использование JSONAssert.assertEquals
позволяет обеспечить консистентность представления данных на разных уровнях — в Kafka сообщениях, логах и тестах. Это упрощает процесс тестирования, обеспечивая гибкость сравнения и точность диагностики ошибок.
В статье представлен пример с JSON форматом сообщений; другие форматы не рассматриваются, но описанный подход не накладывает ограничений на формат.
Как найти свое сообщение в RecordCaptor
Распределение сообщений в RecordCaptor
осуществляется по названию топика и ключу. В предложенном тесте в качестве ключа используется ключ сообщения в Kafka. При отправке мы явно указываем его:
sendMessage("topicC", chatId, ...);
...
private void sendMessage(String topic, String key, Object payload) {
Message message = MessageBuilder
.withPayload(objectMapper.writeValueAsString(payload))
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.KEY, key) <-- указываем ключ
.build();
kafkaTemplate.send(message).get();
}
Поиск по ключу сообщения в топике:
when:
def message = recordCaptor.getRecords("topicC", "20000000").last <-- используем
Если такой вариант не подходит, необходимо описать собственные индексы по параметрам сообщения, на основе которых нужно строить поиск. Пример можно посмотреть в тестах PolicyViolationTestsCustomIndex.groovy.
Подключение RecordCaptor
Код для подключения RecordCaptor
выглядит следующим образом:
@TestConfiguration(proxyBeanMethods = false)
public class RecordCaptorConfiguration {
@Bean
RecordCaptor recordCaptor() {
return new RecordCaptor();
}
@Bean
RecordCaptorConsumer recordCaptorConsumer(RecordCaptor recordCaptor) {
return new RecordCaptorConsumer(recordCaptor, new RecordSnapshotMapper());
}
}
OffsetSnapshotFrame
Опыт показал, что работа с приложениями, использующими Kafka, требует инструментов, облегчающих понимание состояния консумеров и статуса потребления сообщений. Для данной задачи можно в операции ожидания подтверждения смещения сверять офсеты топиков и консюмер-групп и выводить в лог расхождения, например, как показано на рисунке:
Код OffsetComparisonFrame доступен для ознакомления.
Заключение
Тестирование сообщений в Kafka с использованием предложенного подхода не только упрощает процесс написания тестов, но и делает его более структурированным и понятным. Применение инструментов, таких как RecordCaptor
, а также соблюдение принципов изоляции и четкого разделения этапов тестирования, позволяют достигать высокой точности и эффективности.
Ссылка на репозиторий проекта с демонстрацией тестов — kafka-test-support.
Спасибо за внимание к статье, и удачи в вашем стремлении к написанию эффективных и наглядных тестов!