Привет, Хабр! С вами Александр Бобряков, техлид в команде МТС Аналитики. Я к вам с новой статьёй из цикла про фреймворк Apache Flink.
В предыдущей части я рассказал, как создать Unit-тест на полноценную джобу Flink и отдельные stateful-операторы с использованием Flink MiniCluster. Ещё мы научились запускать мини-кластер один раз перед всеми тестовыми классами, которые нуждаются в нём. В дополнение создали вспомогательные абстракции и аннотации, значительно разделяя ответственность в тестах и упрощая логику написания новых тестов.
В предыдущих тестах на джобу мы не затрагивали интеграцию с Kafka, ведь нам были не важны реальные source и sink. В этой статье продолжим разбираться в тестировании и напишем полноценный E2E-тест, который охватит Kafka и Flink вместе с использованием Testcontainers. Также рассмотрим неочевидные проблемы в тестировании и новые универсальные абстракции.
Список моих постов про Flink
Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке — итоговый проект по всей серии статей. Эта статья соответствует релизной ветке под названием release/6_e2e_deduplicator_test.
Оглавление статьи
E2E-тестирование
E2E-тестирование охватывает поведение всей системы от начала до конца. По специфике нашей джобы, которую мы рассматривали в предыдущих частях, в начале есть Kafka-топик с данными ClickMessage, а на выходе — много разных product-топиков. Значит, в тесте всё это должно учитываться.
Цель такого теста — проверить все используемые интеграции между собой. Иначе блоки системы могут работать по отдельности, а вместе всё сломается. В нашем случае должен подниматься весь Spring-контекст, стартовать Flink и Kafka, как будто мы запускаем приложение на проде.
Поднимаем Kafka с помощью Testcontainers
Testcontainers — это библиотека Java, которая поддерживает тесты JUnit. Она даёт возможность запускать в них всё, что может запускаться в Docker. Значит, вы можете проверить любую интеграцию вашего приложения: с БД, брокерами сообщений, другими сервисами и так далее.
Сценарий написания теста в итоге выглядит так:
Определить в тесте Testcontainers контейнер — например, для Kafka.
Запустить Kafka-контейнер.
Пробросить свойства для подключения к Kafka-контейнеру в конфиг приложения.
Запустить тест, в котором приложение подключается согласно конфигу к Kafka-контейнеру.
В предыдущей статье мы затронули создание кастомных аннотаций под тесты. Это было достаточно удобно, поэтому предлагаю придерживаться аналогичного подхода и в этот раз. Для начала нам нужно подключение к Kafka. TestContainers предоставляет Kafka-контейнер «из коробки», который не нуждается в отдельной инициализации Zookeeper. Ещё TestContainers можно использовать, чтобы создать все необходимые контейнеры на основе любых докер-образов.
Для использования готового Kafka-контейнера можем воспользоваться зависимостью:
testImplementation "org.testcontainers:kafka"
Аналогично тому, как мы создали JUnit Extension для старта Flink MiniCluster, создадим новый Extension для старта Kafka-контейнера:
@Slf4j
@SuppressWarnings({"PMD.AvoidUsingVolatile"})
public class KafkaContainerExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource {
private static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.2"))
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
private static final Lock LOCK = new ReentrantLock();
private static volatile boolean started;
@Override
public void beforeAll(ExtensionContext context) {
LOCK.lock();
try {
if (!started) {
log.info("Start Kafka Container");
started = true;
Startables.deepStart(KAFKA).join();
System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers());
System.setProperty("kafka.bootstrap-servers", KAFKA.getBootstrapServers());
System.setProperty("spring.kafka.consumer.group-id", "group-id-spring");
context.getRoot().getStore(GLOBAL).put("Kafka Container", this);
}
} finally {
LOCK.unlock();
}
}
@Override
public void close() {
log.info("Close Kafka Container");
KAFKA.close();
started = false;
}
}
Код очень похож на наш существующий FlinkClusterExtension, который я описал в прошлой статье: мы инициализируем Kafka-контейнер по указанному докер-образу, потом в beforeAll() синхронно запускаем его через вызов Startables.deepStart(KAFKA).join(), обвязывая блокировками. В конце выполнения всех зависимых тестов закрываем контейнер в колбэк-методе жизненного цикла JUnit тестов close().
Возникает вопрос: как наше Spring-приложение при старте будет подключаться к Kafka? Ведь контейнер запускается на случайном свободном порту. Для этого мы передаём настройки в переменные окружения через System.setProperty() в статическом контексте непосредственно перед стартом приложения. Настройки передаём согласно структуре application.yml, потому что они будут «перезатираться» из указанных переменных окружения — документация:
kafka:
group-id: group_id
bootstrap-servers: localhost:29092
Для Spring передаём свойства, чтобы автоматически создались основные Spring-бины для интеграции с Kafka. Например, KafkaTemplate — абстракция над Kafka Producer, которая умеет отправлять сообщения в топик. Он понадобится нам в рамках тестов, поэтому добавим зависимость в тестах:
testImplementation "org.springframework.kafka:spring-kafka"
Этого можно было бы добиться и альтернативным способом: через Spring-инициализаторы. Например, можно было бы создать свою реализацию интерфейса ApplicationContextInitializer. Но мы не будем разбирать этот способ в статье, потому что реализация через Extension выглядит красивее.
В итоге, чтобы применять текущий Extension в тестах, нам была бы полезна своя аннотация по аналогии с аннотацией @WithFlinkCluster:
@Retention(RUNTIME)
@ExtendWith({KafkaContainerExtension.class})
@Inherited
public @interface WithKafkaContainer {
}
Эту аннотацию можно вешать на любой тестовый класс, которому нужна интеграция с Kafka.
Абстракции тестирования для Kafka
Во всех тестах было бы удобно пользоваться своими абстракциями, фасадами или dto для взаимодействия с Kafka.
KafkaTestConsumer
Во-первых, нужно определить тестовый Consumer, который будем создавать в каждом тесте отдельно, чтобы он подключался к топикам в рамках новой консьюмерной группы. Ещё он должен использовать де-/сериализацию Jackson, ведь мы определили для сообщений формат JSON. Важно помнить: Kafka-контейнер поднимается в единственном экземпляре. Поэтому, если написать тесты неправильно, они могут косвенно влиять друг на друга.
Класс для создания тестового Kafka consumer можно представить в таком виде:
public class KafkaTestConsumer implements AutoCloseable {
private final Consumer<String, String> consumer;
private final List<KafkaMessage> receivedMessages = new CopyOnWriteArrayList<>();
private final ObjectMapper objectMapper = createObjectMapper();
public KafkaTestConsumer(String bootstrapServers, Set<String> topics) {
this.consumer = new KafkaConsumer<>(
Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
)
);
consumer.subscribe(topics);
}
public <T> List<T> receiveAndGetAll(String topic, Class<T> clazz) {
return receiveAndGetAll()
.stream()
.filter(kafkaMessage -> topic.equals(kafkaMessage.getTopic()))
.map(kafkaMessage -> readValue(kafkaMessage, clazz))
.collect(toList());
}
private List<KafkaMessage> receiveAndGetAll() {
final var records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
receivedMessages.add(new KafkaMessage(record.key(), record.topic(), record.value()));
}
consumer.commitSync();
return receivedMessages;
}
@SneakyThrows
private <T> T readValue(KafkaMessage kafkaMessage, Class<T> clazz) {
return objectMapper.readValue(kafkaMessage.getValue(), clazz);
}
@Override
public void close() {
receivedMessages.clear();
consumer.close();
}
}
Код достаточно прост. В конструкторе определяем основные параметры подключения к Kafka, создаётся базовый consumer, который подписывается на переданные топики. Потом предоставляем основной метод receiveAndGetAll(), который отдаёт все десериализованные сообщения в конкретный тип в разрезе какого-то топика. Внутри этого метода происходит вызов базового consumer.poll() для обращения к Kafka-контейнеру. Внутри используем свою dto KafkaMessage, чтобы не работать с ConsumerRecord напрямую, ведь большинство информации в нём нам не требуется:
@Value
public class KafkaMessage {
String key;
String topic;
String value;
}
TestKafkaFacade
Нам понадобится абстракция-фасад, которая позволит отправлять сообщения в Kafka, создавать новые топики в каждом тесте, а ещё создавать KafkaTestConsumer. Это будет происходить в контексте Spring-приложения, поэтому удобно создать общий Spring-компонент TestKafkaFacade:
@TestComponent
@SuppressWarnings("PMD.TestClassWithoutTestCases")
public class TestKafkaFacade {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaAdmin kafkaAdmin;
private final ObjectMapper objectMapper = createObjectMapper();
public void createTopicsIfNeeded(String... names) {
final var topics = kafkaAdmin.describeTopics();
if (!topics.keySet().containsAll(Stream.of(names).collect(toSet()))) {
kafkaAdmin.createOrModifyTopics(
Stream.of(names)
.map(n -> new NewTopic(n, 1, (short) 1))
.toArray(NewTopic[]::new)
);
}
}
@SneakyThrows
public void sendMessage(String topic, Object message) {
kafkaTemplate
.send(topic, objectMapper.writeValueAsString(message))
.get(5, TimeUnit.SECONDS);
}
public KafkaTestConsumer createKafkaConsumer(Set<String> topics) {
return new KafkaTestConsumer(System.getProperty("kafka.bootstrap-servers"), topics);
}
}Во-первых, этот фасад инжектит Spring-реализации для KafkaTemplate (абстракция над базовым Kafka Producer), а также KafkaAdmin (абстракция над базовым AdminClient). Эти бины будут существовать в контексте благодаря установленному свойству spring.kafka.bootstrap-servers в нашем KafkaContainerExtension.
У нас есть методы для создания новых топиков, отправки в топик сообщения с учётом сериализации и создания нового KafkaTestConsumer. Bootstrap-servers получаем прямо из env, в который мы записываем bootstrap-servers в рамках KafkaContainerExtension.
KafkaTopicCreatorConfig
Прежде чем переходить к тесту, удобно воспользоваться ещё одним тестовым компонентом для создания всех топиков, указанных в application.yml:
Во-первых, этот фасад инжектит Spring-реализации для KafkaTemplate (абстракция над базовым Kafka Producer), а также KafkaAdmin (абстракция над базовым AdminClient). Эти бины будут существовать в контексте благодаря установленному свойству spring.kafka.bootstrap-servers в нашем KafkaContainerExtension.
У нас есть методы для создания новых топиков, отправки в топик сообщения с учётом сериализации и создания нового KafkaTestConsumer. Bootstrap-servers получаем прямо из env, в который мы записываем bootstrap-servers в рамках KafkaContainerExtension.
KafkaTopicCreatorConfig
Прежде чем переходить к тесту, удобно воспользоваться ещё одним тестовым компонентом для создания всех топиков, указанных в application.yml:
@TestConfiguration
public class KafkaTopicCreatorConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public KafkaAdmin.NewTopics newTopics() {
return new KafkaAdmin.NewTopics(
new NewTopic(kafkaProperties.getTopics().getClickTopic(), 1, (short) 1)
);
}
}
Этот компонент создаёт бин KafkaAdmin.NewTopics, а Spring автоматически создаёт все указанные в нём топики — то есть топики из KafkaProperties (application.yml). В нашем случае сейчас это только один входной топик click-topic. Выходные мы будем создавать в каждом тесте отдельно, потому что они могут определяться динамически в нашем бизнес-кейсе.
E2E-тест на Flink Job
Теперь всё готово для написания первого E2E-теста на нашу джобу. Для этого создадим общую для E2E-тестов аннотацию:
@Retention(RUNTIME)
@SpringBootTest(webEnvironment = NONE)
@ActiveProfiles({"test"})
@Import({
KafkaTopicCreatorConfig.class,
TestKafkaFacade.class
})
@WithFlinkCluster
@WithKafkaContainer
public @interface E2ETest {
}
Она совмещает в себе все интеграции проекта @WithFlinkCluster и @WithKafkaContainer. А ещё поднимает весь Spring-контекст, захватывая тестовые абстракции, которые мы описали выше: KafkaTopicCreatorConfig и TestKafkaFacade.
Напомню, что в нашем приложении точка входа — это AppListener:
@Component
@RequiredArgsConstructor
@ConditionalOnProperty("flink.submit-jobs-on-app-start")
public class AppListener {
private final JobStarter jobStarter;
@EventListener(ApplicationStartedEvent.class)
@SneakyThrows
public void onApplicationStart() {
jobStarter.startJobs();
}
}
Он запускается по условию flink.submit-jobs-on-app-start = true. Но в тестах мы выставим его в false (в application-test.yml), чтобы создавать необходимые предусловия перед непосредственным запуском теста.
Напомню, что наша джоба занимается фильтрацией входного потока ClickMessage, пропуская события только с типом платформы WEB и APP. Дальше события APP проходят дедупликацию. Потом события APP и WEB записываются в выходной топик Kafka, определяющийся динамически из поля ClickMessage.productTopic.
Пайплайн выглядит так:
В итоге E2E-тест на обработку ClickMessage-сообщения может выглядеть таким образом:
@E2ETest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class JobE2ETest {
@Autowired
private JobStarter jobStarter;
@Autowired
private TestKafkaFacade kafka;
@Autowired
private KafkaProperties kafkaProperties;
@Test
@SneakyThrows
void shouldProcessClickMessageSourceToProductSink() {
final var productTopic = "product_topic_1";
kafka.createTopicsIfNeeded(productTopic);
final var clickMessage = aClickMessage().withProductTopic(productTopic).withPlatform(APP).build();
kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage);
kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage);
final var jobClient = jobStarter.startJobs();
@Cleanup final var kafkaConsumer =
kafka.createKafkaConsumer(Set.of(productTopic));
await().atMost(ofSeconds(5))
.until(() -> kafkaConsumer.receiveAndGetAll(productTopic, ProductMessage.class),
productMessages -> productMessages.size() == 1
&& productMessages.get(0).getUserId().equals(clickMessage.getUserId())
);
jobClient.cancel().get(5, TimeUnit.SECONDS);
}
}
Мы повесили единственную аннотацию @E2ETest, благодаря которой произойдёт запуск Flink-мини-кластера, Kafka-контейнера и всего Spring-контекста нашего приложения. Точку входа JobStarter будем запускать сами в обход AppListener уже после настройки теста, благодаря настройке в application-test.yml: flink.submit-jobs-on-app-start = false.
А вот что происходит в самом тесте:
Создаём новый productTopic. Мы ожидаем, что именно в него попадёт входное ClickMessage-сообщение после обработки.
Создаём само ClickMessage-сообщение. Обязательно передаём в него productTopic.
Отправляем сообщение во входной топик click-topic дважды (ожидаем дедупликацию).
На этом подготовка теста завершена. У нас есть:
все топики
сообщение во входном топике
поднятое Spring-приложение
Пора стартовать задачу! Наша реализация jobStarter.startJobs(); запускает все найденные в контексте Spring задачи (наследующиеся от нашей абстракции FlinkJob) асинхронно через environment.executeAsync(). Дальше мы можем дожидаться сообщения в выходном топике.
Важный момент — именно асинхронный запуск задания, который возвращает объект управления JobClient. Например, раньше в тестах мы использовали только метод execute(), который был синхронным. В случае асинхронного запуска через JobClient мы можем получать статус задачи, завершать её руками и так далее. Это важно, ведь наш Kafka-источник потенциально бесконечен.
Поэтому дальше мы создаём наш AutoCloseable Kafka Consumer и подписываем его на выходной productTopic. Потом периодически проверяем, появилось ли там сообщение ProductMessage с userId, идентичным входному ClickMessage, в течение пяти секунд. Если за пять секунд не дождались сообщения, тест завершится с ошибкой. Для программной реализации такой проверки в тесте используются библиотеки awaitility. После проверки завершаем асинхронную джобу синхронным вызовом: jobClient.cancel().
В этом тесте есть большая проблема: а что, если assert не выполнится или джоба во время запуска упадёт? Будут ли какие-то проблемы с самими тестами?
На самом деле проблемы будут. Например, метод jobClient.cancel() вообще не выполнится, а джоба ещё долго может висеть в мини-кластере. В это время мини-кластер может начать использоваться для следующей джобы следующего теста, а его ресурсов для этого не хватит. Плюс возможны разные сайд-эффекты между выполнением таких тестов. Как решить эту проблему? Об этом — дальше.
Безопасное завершение E2E-тестов
Чтобы решить проблему, которую я описал выше, перед неудачным завершением теста лучше всегда подчищать ресурсы. Для этого можно в лоб оборачивать всё в блок try и finally. Вот как это сделать:
try {
final var jobClient = jobStarter.startJobs();
// ...
await().atMost(ofSeconds(5)).until(...);
} finally {
jobClient.cancel().get(5, TimeUnit.SECONDS);
}
Выглядит не очень красиво. А ещё такой блок придётся писать в каждом тесте, который использует асинхронный запуск задач. Напрашивается декоратор над абстракцией FlinkClient, который умеет завершать задания:
@RequiredArgsConstructor
public class AutoCloseableJobClient implements JobClient, AutoCloseable {
private final JobClient original;
@Override
public JobID getJobID() {
return original.getJobID();
}
@Override
public CompletableFuture<JobStatus> getJobStatus() {
return original.getJobStatus();
}
@Override
public CompletableFuture<Void> cancel() {
return original.cancel();
}
@Override
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory, SavepointFormatType formatType) {
return original.stopWithSavepoint(advanceToEndOfEventTime, savepointDirectory, formatType);
}
@Override
public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory, SavepointFormatType formatType) {
return original.triggerSavepoint(savepointDirectory, formatType);
}
@Override
public CompletableFuture<Map<String, Object>> getAccumulators() {
return original.getAccumulators();
}
@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
return original.getJobExecutionResult();
}
@Override
public void close() throws Exception {
original.cancel().get(5, TimeUnit.SECONDS);
}
}
Декоратор делегирует выполнение внутреннему оригинальному JobClient. Но дополнительно мы имплементируем интерфейс AutoCloseable, чтобы перенести логику завершения джобы в метод close(). Теперь осталось вернуть этот декоратор в нашем JobStarter:
@SneakyThrows
public AutoCloseableJobClient startJobs() {
if (jobs.isEmpty()) {
log.info("No Jobs found for start");
return null;
}
for (FlinkJob job : jobs) {
log.info("Register job '{}'", job.getClass().getSimpleName());
job.registerJob(environment);
}
return new AutoCloseableJobClient(environment.executeAsync());
}
Тесты при этом значительно упрощаются:
@Test
@SneakyThrows
void shouldProcessClickMessageSourceToProductSink() {
final var productTopic = "product_topic_1";
kafka.createTopicsIfNeeded(productTopic);
final var clickMessage = aClickMessage().withProductTopic(productTopic).withPlatform(APP).build();
kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage);
kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage);
@Cleanup final var jobClient = jobStarter.startJobs();
@Cleanup final var kafkaConsumer =
kafka.createKafkaConsumer(Set.of(productTopic));
await().atMost(ofSeconds(5))
.until(() -> kafkaConsumer.receiveAndGetAll(productTopic, ProductMessage.class),
productMessages -> productMessages.size() == 1
&& productMessages.get(0).getUserId().equals(clickMessage.getUserId())
);
}
Получается, мы добавили аннотацию @Cleanup для объекта JobClient и убрали лишнюю завершающую строку jobClient.cancel().get(5, TimeUnit.SECONDS).
RocksDB в E2E-тестах
Чтобы окончательно убедиться, что наши E2E-тесты покрывают необходимую функциональность, нужно воссоздать окружение, аналогичное production. В нём для больших состояний рекомендуется использовать RocksDB в качестве StateBackend, который применяется напрямую в дедупликаторе. RocksDB в качестве бэкенда состояний я описывал в первой статье этого цикла.
Итак, можно заставить E2E-тест использовать локально поднятый RocksDB. Сделать это достаточно просто, так как Flink предоставляет RocksDB «из коробки». В тестах достаточно указать его в качестве состояния:
@TestConfiguration
public class FlinkProductionConfig {
@Autowired
public void changeFlinkEnvironment(StreamExecutionEnvironment environment) {
final var backend = new EmbeddedRocksDBStateBackend(false);
environment.setStateBackend(backend);
}
}
То есть наш компонент перехватывает настройку StreamExecutionEnvironment и указывает напрямую состояние EmbeddedRocksDBStateBackend. Чтобы его использовать, нужно добавить зависимость:
testImplementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"
Теперь осталось только использовать этот конфиг в E2E-тестах, добавив его в аннотацию @E2ETest. Вот что в итоге вы сможете наблюдать в логах:
INFO 35116 --- [ger-io-thread-1] o.a.flink.runtime.jobmaster.JobMaster : Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=FALSE, numberOfTransferThreads=-1, writeBatchSize=-1}
Важное замечание. В процессе использования RocksDb он создаёт свои временные директории с достаточно длинными путями. Например, для Mac:
INFO 35116 --- [essages (1/2)#0] .f.c.s.s.RocksDBKeyedStateBackendBuilder : Finished building RocksDB keyed state-backend at /var/folders/_y/gd8sxnq91z9glrxjlkrj98tnbsxn37/T/junit7312928894650170068/junit1795779582692257025/minicluster_aabeebf82ae121d1fa503365b6aa7eb7/tm_0/tmp/job_9a5c9344174adde1453cf361f9a0f43f_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_270162db-c22d-49ff-bb62-9ed6d9ef2ac5.
Поэтому при запуске кода на Windows вы можете получить неожиданную ошибку:
Caused by: java.io.IOException: The directory path length (275) is longer than the directory path length limit for Windows (247): C:\Users\User\AppData\Local\Temp\junit15245919089769733580\junit7040907885113472950\minicluster_7d9295c691c345c6209f2bb0db16b593\tm_0\tmp\job_7e441850886077eb74921aa5bb0e41f4_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_bcc33f69-17b2-4424-9620-25922af3be34\db
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.throwExceptionIfPathLengthExceededOnWindows(RocksDBOperationUtils.java:285) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:85) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:325) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
... 18 common frames omitted
Caused by: org.rocksdb.RocksDBException: Failed to create a directory: C:\Users\User\AppData\Local\Temp\junit15245919089769733580\junit7040907885113472950\minicluster_7d9295c691c345c6209f2bb0db16b593\tm_0\tmp\job_7e441850886077eb74921aa5bb0e41f4_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_bcc33f69-17b2-4424-9620-25922af3be34\db: The directory path length (275) is longer than the directory path length limit for Windows (247).
at org.rocksdb.RocksDB.open(Native Method) ~[frocksdbjni-6.20.3-ververica-2.0.jar:na]
at org.rocksdb.RocksDB.open(RocksDB.java:306) ~[frocksdbjni-6.20.3-ververica-2.0.jar:na]
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:75) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
... 22 common frames omitted
Есть статьи о том, как решить эту проблему. Но у некоторых разработчиков описанные решения не сработали. Поэтому при тестировании Flink-задач с использованием embedded RocksDB рекомендуется не использовать Windows.
Вывод
Мы разобрали, как написать E2E-тест на Flink и Spring-джобу с использованием Kafka и Testcontainers. Мы создали удобные абстракции и поговорили об основных проблемах, которые нужно учитывать при таком виде тестирования.
На этом серию статей про тестирование я закончу. Дальше перейду к разбору таймеров в Flink с более сложным хранением состояния. Мы посмотрим, как отложить какое-то действие и отправить событие по установленному таймеру с помощью состояния. Разберём, какие проблемы могут возникнуть при более сложном взаимодействии с состояниями. Конечно, продолжим покрывать код тестами — а значит, встретимся с новыми механизмами и практиками при тестировании.