Привет, Хабр! С вами Александр Бобряков, техлид в команде МТС Аналитики. Я к вам с новой статьёй из цикла про фреймворк Apache Flink.

В предыдущей части я рассказал, как создать Unit-тест на полноценную джобу Flink и отдельные stateful-операторы с использованием Flink MiniCluster. Ещё мы научились запускать мини-кластер один раз перед всеми тестовыми классами, которые нуждаются в нём. В дополнение создали вспомогательные абстракции и аннотации, значительно разделяя ответственность в тестах и упрощая логику написания новых тестов.

В предыдущих тестах на джобу мы не затрагивали интеграцию с Kafka, ведь нам были не важны реальные source и sink. В этой статье продолжим разбираться в тестировании и напишем полноценный E2E-тест, который охватит Kafka и Flink вместе с использованием Testcontainers. Также рассмотрим неочевидные проблемы в тестировании и новые универсальные абстракции.

Список моих постов про Flink

Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке — итоговый проект по всей серии статей. Эта статья соответствует релизной ветке под названием release/6_e2e_deduplicator_test.

Оглавление статьи

E2E-тестирование

E2E-тестирование охватывает поведение всей системы от начала до конца. По специфике нашей джобы, которую мы рассматривали в предыдущих частях, в начале есть Kafka-топик с данными ClickMessage, а на выходе — много разных product-топиков. Значит, в тесте всё это должно учитываться.

Цель такого теста — проверить все используемые интеграции между собой. Иначе блоки системы могут работать по отдельности, а вместе всё сломается. В нашем случае должен подниматься весь Spring-контекст, стартовать Flink и Kafka, как будто мы запускаем приложение на проде.

Поднимаем Kafka с помощью Testcontainers

Testcontainers — ​​это библиотека Java, которая поддерживает тесты JUnit. Она даёт возможность запускать в них всё, что может запускаться в Docker. Значит, вы можете проверить любую интеграцию вашего приложения: с БД, брокерами сообщений, другими сервисами и так далее.

Сценарий написания теста в итоге выглядит так:

  1. Определить в тесте Testcontainers контейнер — например, для Kafka.

  2. Запустить Kafka-контейнер.

  3. Пробросить свойства для подключения к Kafka-контейнеру в конфиг приложения.

  4. Запустить тест, в котором приложение подключается согласно конфигу к Kafka-контейнеру.

В предыдущей статье мы затронули создание кастомных аннотаций под тесты. Это было достаточно удобно, поэтому предлагаю придерживаться аналогичного подхода и в этот раз. Для начала нам нужно подключение к Kafka. TestContainers предоставляет Kafka-контейнер «из коробки», который не нуждается в отдельной инициализации Zookeeper. Ещё TestContainers можно использовать, чтобы создать все необходимые контейнеры на основе любых докер-образов.

Для использования готового Kafka-контейнера можем воспользоваться зависимостью:

testImplementation "org.testcontainers:kafka"

Аналогично тому, как мы создали JUnit Extension для старта Flink MiniCluster, создадим новый Extension для старта Kafka-контейнера:

@Slf4j
@SuppressWarnings({"PMD.AvoidUsingVolatile"})
public class KafkaContainerExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource {
   private static final KafkaContainer KAFKA =
       new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.2"))
           .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");

   private static final Lock LOCK = new ReentrantLock();
   private static volatile boolean started;

   @Override
   public void beforeAll(ExtensionContext context) {
       LOCK.lock();
       try {
           if (!started) {
               log.info("Start Kafka Container");
               started = true;
               Startables.deepStart(KAFKA).join();
               System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers());
               System.setProperty("kafka.bootstrap-servers", KAFKA.getBootstrapServers());
               System.setProperty("spring.kafka.consumer.group-id", "group-id-spring");
               context.getRoot().getStore(GLOBAL).put("Kafka Container", this);
           }
       } finally {
           LOCK.unlock();
       }
   }

   @Override
   public void close() {
       log.info("Close Kafka Container");
       KAFKA.close();
       started = false;
   }
}

Код очень похож на наш существующий FlinkClusterExtension, который я описал в прошлой статье: мы инициализируем Kafka-контейнер по указанному докер-образу, потом в beforeAll() синхронно запускаем его через вызов Startables.deepStart(KAFKA).join(), обвязывая блокировками. В конце выполнения всех зависимых тестов закрываем контейнер в колбэк-методе жизненного цикла JUnit тестов close().

Возникает вопрос: как наше Spring-приложение при старте будет подключаться к Kafka? Ведь контейнер запускается на случайном свободном порту. Для этого мы передаём настройки в переменные окружения через System.setProperty() в статическом контексте непосредственно перед стартом приложения. Настройки передаём согласно структуре application.yml, потому что они будут «перезатираться» из указанных переменных окружения — документация:

kafka:
 group-id: group_id
 bootstrap-servers: localhost:29092

Для Spring передаём свойства, чтобы автоматически создались основные Spring-бины для интеграции с Kafka. Например, KafkaTemplate — абстракция над Kafka Producer, которая умеет отправлять сообщения в топик. Он понадобится нам в рамках тестов, поэтому добавим зависимость в тестах:

testImplementation "org.springframework.kafka:spring-kafka"

Этого можно было бы добиться и альтернативным способом: через Spring-инициализаторы. Например, можно было бы создать свою реализацию интерфейса ApplicationContextInitializer. Но мы не будем разбирать этот способ в статье, потому что реализация через Extension выглядит красивее.

В итоге, чтобы применять текущий Extension в тестах, нам была бы полезна своя аннотация по аналогии с аннотацией @WithFlinkCluster:

@Retention(RUNTIME)
@ExtendWith({KafkaContainerExtension.class})
@Inherited
public @interface WithKafkaContainer {
}

Эту аннотацию можно вешать на любой тестовый класс, которому нужна интеграция с Kafka.

Абстракции тестирования для Kafka

Во всех тестах было бы удобно пользоваться своими абстракциями, фасадами или dto для взаимодействия с Kafka.

KafkaTestConsumer

Во-первых, нужно определить тестовый Consumer, который будем создавать в каждом тесте отдельно, чтобы он подключался к топикам в рамках новой консьюмерной группы. Ещё он должен использовать де-/сериализацию Jackson, ведь мы определили для сообщений формат JSON. Важно помнить: Kafka-контейнер поднимается в единственном экземпляре. Поэтому, если написать тесты неправильно, они могут косвенно влиять друг на друга.

Класс для создания тестового Kafka consumer можно представить в таком виде:

public class KafkaTestConsumer implements AutoCloseable {
   private final Consumer<String, String> consumer;
   private final List<KafkaMessage> receivedMessages = new CopyOnWriteArrayList<>();
   private final ObjectMapper objectMapper = createObjectMapper();

   public KafkaTestConsumer(String bootstrapServers, Set<String> topics) {
       this.consumer = new KafkaConsumer<>(
           Map.of(
               ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
               ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
               ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
               ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
               ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
           )
       );
       consumer.subscribe(topics);
   }

   public <T> List<T> receiveAndGetAll(String topic, Class<T> clazz) {
       return receiveAndGetAll()
                  .stream()
                  .filter(kafkaMessage -> topic.equals(kafkaMessage.getTopic()))
                  .map(kafkaMessage -> readValue(kafkaMessage, clazz))
                  .collect(toList());
   }

   private List<KafkaMessage> receiveAndGetAll() {
       final var records = consumer.poll(Duration.ofSeconds(5));
       for (ConsumerRecord<String, String> record : records) {
           receivedMessages.add(new KafkaMessage(record.key(), record.topic(), record.value()));
       }
       consumer.commitSync();
       return receivedMessages;
   }

   @SneakyThrows
   private <T> T readValue(KafkaMessage kafkaMessage, Class<T> clazz) {
       return objectMapper.readValue(kafkaMessage.getValue(), clazz);
   }

   @Override
   public void close() {
       receivedMessages.clear();
       consumer.close();
   }
}

Код достаточно прост. В конструкторе определяем основные параметры подключения к Kafka, создаётся базовый consumer, который подписывается на переданные топики. Потом предоставляем основной метод receiveAndGetAll(), который отдаёт все десериализованные сообщения в конкретный тип в разрезе какого-то топика. Внутри этого метода происходит вызов базового consumer.poll() для обращения к Kafka-контейнеру. Внутри используем свою dto KafkaMessage, чтобы не работать с ConsumerRecord напрямую, ведь большинство информации в нём нам не требуется:

@Value
public class KafkaMessage {
   String key;
   String topic;
   String value;
}

TestKafkaFacade

Нам понадобится абстракция-фасад, которая позволит отправлять сообщения в Kafka, создавать новые топики в каждом тесте, а ещё создавать KafkaTestConsumer. Это будет происходить в контексте Spring-приложения, поэтому удобно создать общий Spring-компонент TestKafkaFacade:

@TestComponent
@SuppressWarnings("PMD.TestClassWithoutTestCases")
public class TestKafkaFacade {
   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;
   @Autowired
   private KafkaAdmin kafkaAdmin;

   private final ObjectMapper objectMapper = createObjectMapper();

   public void createTopicsIfNeeded(String... names) {
       final var topics = kafkaAdmin.describeTopics();
       if (!topics.keySet().containsAll(Stream.of(names).collect(toSet()))) {
           kafkaAdmin.createOrModifyTopics(
               Stream.of(names)
                   .map(n -> new NewTopic(n, 1, (short) 1))
                   .toArray(NewTopic[]::new)
           );
       }
   }

   @SneakyThrows
   public void sendMessage(String topic, Object message) {
       kafkaTemplate
           .send(topic, objectMapper.writeValueAsString(message))
           .get(5, TimeUnit.SECONDS);
   }

   public KafkaTestConsumer createKafkaConsumer(Set<String> topics) {
       return new KafkaTestConsumer(System.getProperty("kafka.bootstrap-servers"), topics);
   }
}Во-первых, этот фасад инжектит Spring-реализации для KafkaTemplate (абстракция над базовым Kafka Producer), а также KafkaAdmin (абстракция над базовым AdminClient). Эти бины будут существовать в контексте благодаря установленному свойству spring.kafka.bootstrap-servers в нашем KafkaContainerExtension.
У нас есть методы для создания новых топиков, отправки в топик сообщения с учётом сериализации и создания нового KafkaTestConsumer. Bootstrap-servers получаем прямо из env, в который мы записываем bootstrap-servers в рамках KafkaContainerExtension.


KafkaTopicCreatorConfig
Прежде чем переходить к тесту, удобно воспользоваться ещё одним тестовым компонентом для создания всех топиков, указанных в application.yml:

Во-первых, этот фасад инжектит Spring-реализации для KafkaTemplate (абстракция над базовым Kafka Producer), а также KafkaAdmin (абстракция над базовым AdminClient). Эти бины будут существовать в контексте благодаря установленному свойству spring.kafka.bootstrap-servers в нашем KafkaContainerExtension.

У нас есть методы для создания новых топиков, отправки в топик сообщения с учётом сериализации и создания нового KafkaTestConsumer. Bootstrap-servers получаем прямо из env, в который мы записываем bootstrap-servers в рамках KafkaContainerExtension.

KafkaTopicCreatorConfig

Прежде чем переходить к тесту, удобно воспользоваться ещё одним тестовым компонентом для создания всех топиков, указанных в application.yml:

@TestConfiguration
public class KafkaTopicCreatorConfig {
   @Autowired
   private KafkaProperties kafkaProperties;

   @Bean
   public KafkaAdmin.NewTopics newTopics() {
       return new KafkaAdmin.NewTopics(
           new NewTopic(kafkaProperties.getTopics().getClickTopic(), 1, (short) 1)
       );
   }
}

Этот компонент создаёт бин KafkaAdmin.NewTopics, а Spring автоматически создаёт все указанные в нём топики — то есть топики из KafkaProperties (application.yml). В нашем случае сейчас это только один входной топик click-topic. Выходные мы будем создавать в каждом тесте отдельно, потому что они могут определяться динамически в нашем бизнес-кейсе.

E2E-тест на Flink Job

Теперь всё готово для написания первого E2E-теста на нашу джобу. Для этого создадим общую для E2E-тестов аннотацию:

@Retention(RUNTIME)
@SpringBootTest(webEnvironment = NONE)
@ActiveProfiles({"test"})
@Import({
        KafkaTopicCreatorConfig.class,
        TestKafkaFacade.class
})
@WithFlinkCluster
@WithKafkaContainer
public @interface E2ETest {
}

Она совмещает в себе все интеграции проекта @WithFlinkCluster и @WithKafkaContainer. А ещё поднимает весь Spring-контекст, захватывая тестовые абстракции, которые мы описали выше: KafkaTopicCreatorConfig и TestKafkaFacade.

Напомню, что в нашем приложении точка входа — это AppListener:

@Component
@RequiredArgsConstructor
@ConditionalOnProperty("flink.submit-jobs-on-app-start")
public class AppListener {
   private final JobStarter jobStarter;

   @EventListener(ApplicationStartedEvent.class)
   @SneakyThrows
   public void onApplicationStart() {
       jobStarter.startJobs();
   }
}

Он запускается по условию flink.submit-jobs-on-app-start = true. Но в тестах мы выставим его в false (в application-test.yml), чтобы создавать необходимые предусловия перед непосредственным запуском теста.

Напомню, что наша джоба занимается фильтрацией входного потока ClickMessage, пропуская события только с типом платформы WEB и APP. Дальше события APP проходят дедупликацию. Потом события APP и WEB записываются в выходной топик Kafka, определяющийся динамически из поля ClickMessage.productTopic.

Пайплайн выглядит так:

В итоге E2E-тест на обработку ClickMessage-сообщения может выглядеть таким образом:

@E2ETest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class JobE2ETest {
   @Autowired
   private JobStarter jobStarter;
   @Autowired
   private TestKafkaFacade kafka;
   @Autowired
   private KafkaProperties kafkaProperties;

   @Test
   @SneakyThrows
   void shouldProcessClickMessageSourceToProductSink() {
       final var productTopic = "product_topic_1";
       kafka.createTopicsIfNeeded(productTopic);
       final var clickMessage = aClickMessage().withProductTopic(productTopic).withPlatform(APP).build();
       kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage);
       kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage);

       final var jobClient = jobStarter.startJobs();

       @Cleanup final var kafkaConsumer =
           kafka.createKafkaConsumer(Set.of(productTopic));
       await().atMost(ofSeconds(5))
           .until(() -> kafkaConsumer.receiveAndGetAll(productTopic, ProductMessage.class),
               productMessages -> productMessages.size() == 1
                                      && productMessages.get(0).getUserId().equals(clickMessage.getUserId())
           );
       jobClient.cancel().get(5, TimeUnit.SECONDS);
   }
}

Мы повесили единственную аннотацию @E2ETest, благодаря которой произойдёт запуск Flink-мини-кластера, Kafka-контейнера и всего Spring-контекста нашего приложения. Точку входа JobStarter будем запускать сами в обход AppListener уже после настройки теста, благодаря настройке в application-test.yml: flink.submit-jobs-on-app-start = false.

А вот что происходит в самом тесте:

  1. Создаём новый productTopic. Мы ожидаем, что именно в него попадёт входное ClickMessage-сообщение после обработки.

  2. Создаём само ClickMessage-сообщение. Обязательно передаём в него productTopic.

  3. Отправляем сообщение во входной топик click-topic дважды (ожидаем дедупликацию).

На этом подготовка теста завершена. У нас есть:

  • все топики

  • сообщение во входном топике

  • поднятое Spring-приложение

Пора стартовать задачу! Наша реализация jobStarter.startJobs(); запускает все найденные в контексте Spring задачи (наследующиеся от нашей абстракции FlinkJob) асинхронно через environment.executeAsync(). Дальше мы можем дожидаться сообщения в выходном топике.

Важный момент — именно асинхронный запуск задания, который возвращает объект управления JobClient. Например, раньше в тестах мы использовали только метод execute(), который был синхронным. В случае асинхронного запуска через JobClient мы можем получать статус задачи, завершать её руками и так далее. Это важно, ведь наш Kafka-источник потенциально бесконечен.

Поэтому дальше мы создаём наш AutoCloseable Kafka Consumer и подписываем его на выходной productTopic. Потом периодически проверяем, появилось ли там сообщение ProductMessage с userId, идентичным входному ClickMessage, в течение пяти секунд. Если за пять секунд не дождались сообщения, тест завершится с ошибкой. Для программной реализации такой проверки в тесте используются библиотеки awaitility. После проверки завершаем асинхронную джобу синхронным вызовом: jobClient.cancel().

В этом тесте есть большая проблема: а что, если assert не выполнится или джоба во время запуска упадёт? Будут ли какие-то проблемы с самими тестами?

На самом деле проблемы будут. Например, метод jobClient.cancel() вообще не выполнится, а джоба ещё долго может висеть в мини-кластере. В это время мини-кластер может начать использоваться для следующей джобы следующего теста, а его ресурсов для этого не хватит. Плюс возможны разные сайд-эффекты между выполнением таких тестов. Как решить эту проблему? Об этом — дальше.

Безопасное завершение E2E-тестов

Чтобы решить проблему, которую я описал выше, перед неудачным завершением теста лучше всегда подчищать ресурсы. Для этого можно в лоб оборачивать всё в блок try и finally. Вот как это сделать:

try {
    final var jobClient = jobStarter.startJobs();
    // ...
    await().atMost(ofSeconds(5)).until(...);
} finally {
    jobClient.cancel().get(5, TimeUnit.SECONDS);
}

Выглядит не очень красиво. А ещё такой блок придётся писать в каждом тесте, который использует асинхронный запуск задач. Напрашивается декоратор над абстракцией FlinkClient, который умеет завершать задания:

@RequiredArgsConstructor
public class AutoCloseableJobClient implements JobClient, AutoCloseable {
   private final JobClient original;

   @Override
   public JobID getJobID() {
       return original.getJobID();
   }

   @Override
   public CompletableFuture<JobStatus> getJobStatus() {
       return original.getJobStatus();
   }

   @Override
   public CompletableFuture<Void> cancel() {
       return original.cancel();
   }

   @Override
   public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory, SavepointFormatType formatType) {
       return original.stopWithSavepoint(advanceToEndOfEventTime, savepointDirectory, formatType);
   }

   @Override
   public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory, SavepointFormatType formatType) {
       return original.triggerSavepoint(savepointDirectory, formatType);
   }

   @Override
   public CompletableFuture<Map<String, Object>> getAccumulators() {
       return original.getAccumulators();
   }

   @Override
   public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
       return original.getJobExecutionResult();
   }

   @Override
   public void close() throws Exception {
       original.cancel().get(5, TimeUnit.SECONDS);
   }
}

Декоратор делегирует выполнение внутреннему оригинальному JobClient. Но дополнительно мы имплементируем интерфейс AutoCloseable, чтобы перенести логику завершения джобы в метод close(). Теперь осталось вернуть этот декоратор в нашем JobStarter:

@SneakyThrows
public AutoCloseableJobClient startJobs() {
   if (jobs.isEmpty()) {
       log.info("No Jobs found for start");
       return null;
   }
   for (FlinkJob job : jobs) {
       log.info("Register job '{}'", job.getClass().getSimpleName());
       job.registerJob(environment);
   }
   return new AutoCloseableJobClient(environment.executeAsync());
}

Тесты при этом значительно упрощаются:

@Test
@SneakyThrows
void shouldProcessClickMessageSourceToProductSink() {
   final var productTopic = "product_topic_1";
   kafka.createTopicsIfNeeded(productTopic);
   final var clickMessage = aClickMessage().withProductTopic(productTopic).withPlatform(APP).build();
   kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage);
   kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage);

   @Cleanup final var jobClient = jobStarter.startJobs();

   @Cleanup final var kafkaConsumer =
       kafka.createKafkaConsumer(Set.of(productTopic));
   await().atMost(ofSeconds(5))
       .until(() -> kafkaConsumer.receiveAndGetAll(productTopic, ProductMessage.class),
           productMessages -> productMessages.size() == 1
                                  && productMessages.get(0).getUserId().equals(clickMessage.getUserId())
       );
}

Получается, мы добавили аннотацию @Cleanup для объекта JobClient и убрали лишнюю завершающую строку jobClient.cancel().get(5, TimeUnit.SECONDS).

RocksDB в E2E-тестах

Чтобы окончательно убедиться, что наши E2E-тесты покрывают необходимую функциональность, нужно воссоздать окружение, аналогичное production. В нём для больших состояний рекомендуется использовать RocksDB в качестве StateBackend, который применяется напрямую в дедупликаторе. RocksDB в качестве бэкенда состояний я описывал в первой статье этого цикла.

Итак, можно заставить E2E-тест использовать локально поднятый RocksDB. Сделать это достаточно просто, так как Flink предоставляет RocksDB «из коробки». В тестах достаточно указать его в качестве состояния:

@TestConfiguration
public class FlinkProductionConfig {

   @Autowired
   public void changeFlinkEnvironment(StreamExecutionEnvironment environment) {
       final var backend = new EmbeddedRocksDBStateBackend(false);
       environment.setStateBackend(backend);
   }
}

То есть наш компонент перехватывает настройку StreamExecutionEnvironment и указывает напрямую состояние EmbeddedRocksDBStateBackend. Чтобы его использовать, нужно добавить зависимость:

testImplementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"

Теперь осталось только использовать этот конфиг в E2E-тестах, добавив его в аннотацию @E2ETest. Вот что в итоге вы сможете наблюдать в логах:

INFO 35116 --- [ger-io-thread-1] o.a.flink.runtime.jobmaster.JobMaster    : Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=FALSE, numberOfTransferThreads=-1, writeBatchSize=-1}

Важное замечание. В процессе использования RocksDb он создаёт свои временные директории с достаточно длинными путями. Например, для Mac:

INFO 35116 --- [essages (1/2)#0] .f.c.s.s.RocksDBKeyedStateBackendBuilder : Finished building RocksDB keyed state-backend at /var/folders/_y/gd8sxnq91z9glrxjlkrj98tnbsxn37/T/junit7312928894650170068/junit1795779582692257025/minicluster_aabeebf82ae121d1fa503365b6aa7eb7/tm_0/tmp/job_9a5c9344174adde1453cf361f9a0f43f_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_270162db-c22d-49ff-bb62-9ed6d9ef2ac5.

Поэтому при запуске кода на Windows вы можете получить неожиданную ошибку:

Caused by: java.io.IOException: The directory path length (275) is longer than the directory path length limit for Windows (247): C:\Users\User\AppData\Local\Temp\junit15245919089769733580\junit7040907885113472950\minicluster_7d9295c691c345c6209f2bb0db16b593\tm_0\tmp\job_7e441850886077eb74921aa5bb0e41f4_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_bcc33f69-17b2-4424-9620-25922af3be34\db
	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.throwExceptionIfPathLengthExceededOnWindows(RocksDBOperationUtils.java:285) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:85) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:325) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
	... 18 common frames omitted
Caused by: org.rocksdb.RocksDBException: Failed to create a directory: C:\Users\User\AppData\Local\Temp\junit15245919089769733580\junit7040907885113472950\minicluster_7d9295c691c345c6209f2bb0db16b593\tm_0\tmp\job_7e441850886077eb74921aa5bb0e41f4_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_bcc33f69-17b2-4424-9620-25922af3be34\db: The directory path length (275) is longer than the directory path length limit for Windows (247).
	at org.rocksdb.RocksDB.open(Native Method) ~[frocksdbjni-6.20.3-ververica-2.0.jar:na]
	at org.rocksdb.RocksDB.open(RocksDB.java:306) ~[frocksdbjni-6.20.3-ververica-2.0.jar:na]
	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:75) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0]
	... 22 common frames omitted

Есть статьи о том, как решить эту проблему. Но у некоторых разработчиков описанные решения не сработали. Поэтому при тестировании Flink-задач с использованием embedded RocksDB рекомендуется не использовать Windows.

Вывод

Мы разобрали, как написать E2E-тест на Flink и Spring-джобу с использованием Kafka и Testcontainers. Мы создали удобные абстракции и поговорили об основных проблемах, которые нужно учитывать при таком виде тестирования.

На этом серию статей про тестирование я закончу. Дальше перейду к разбору таймеров в Flink с более сложным хранением состояния. Мы посмотрим, как отложить какое-то действие и отправить событие по установленному таймеру с помощью состояния. Разберём, какие проблемы могут возникнуть при более сложном взаимодействии с состояниями. Конечно, продолжим покрывать код тестами — а значит, встретимся с новыми механизмами и практиками при тестировании.

Комментарии (0)