Привет, Хабр! На связи Александр Бобряков, техлид в команде МТС Аналитики. В предыдущей части я рассказал про создание Flink-джобы Kafka-to-Kafka с оператором на основе встроенных таймеров. Такой пайплайн позволяет создавать вызов через определенное время после обработки события.

В этом посте я расскажу, как можно протестировать операторы с таймерами и какие подводные камни могут возникнуть.

Весь разбираемый исходный код есть в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии. Эта статья соответствует релизной ветке с названием release/8_Test_for_Trigger_Flink_Job.

Это мой девятый материал про Apache Flink. По мере выхода новых ссылки на них будут появляться ниже.

Список моих статей про Flink:

Оглавление:

Оператор с таймерами

В предыдущей части мы создали оператор TriggerAlertProcessor, встроенный в процесс обработки событий Kafka-to-Kafka. Он сохранял некую информацию в текущем состоянии, чтобы по прошествии таймера в 10 минут отправлять события дальше на основе сохраненного состояния. Этот код выглядит так:

public class TriggerAlertProcessor extends KeyedProcessFunction<String, TriggerMessage, AlertMessage> {
   private static final long serialVersionUID = 1L;

   private final Duration stateWaiting;
   private ValueState<AlertState> alertState;
   private ValueState<Long> timestampState;
   private transient MessageTransformer<TriggerMessage, AlertState> messageToStateTransformer;
   private transient MessageTransformer<AlertState, AlertMessage> stateToMessageTransformer;

   public TriggerAlertProcessor(@NotNull Duration stateWaiting) {
       this.stateWaiting = stateWaiting;
   }

   @Override
   public void open(Configuration parameters) {
       messageToStateTransformer = new TriggerMessageToAlertStateTransformer();
       stateToMessageTransformer = new AlertStateToAlertMessageTransformer();
       final var defaultTtlConfig = StateTtlConfig
               .newBuilder(Time.minutes(stateWaiting.toMillis() + 1000))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               .setStateVisibility(NeverReturnExpired)
               .cleanupFullSnapshot()
               .build();

       // alert state
       final var alertDescriptor = new ValueStateDescriptor<>("alertState", AlertState.class);
       alertDescriptor.enableTimeToLive(defaultTtlConfig);
       alertState = getRuntimeContext().getState(alertDescriptor);

       // timestamp state
       final var timestampDescriptor = new ValueStateDescriptor<>("timestampState", Types.LONG);
       timestampDescriptor.enableTimeToLive(defaultTtlConfig);
       timestampState = getRuntimeContext().getState(timestampDescriptor);
   }

   @Override
   public void processElement(TriggerMessage message,
                              KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx,
                              Collector<AlertMessage> out) throws Exception {
       final var status = message.getStatus();
       if (START.equals(status)) {
           // create timer
           final var state = messageToStateTransformer.transform(message);
           alertState.update(state);
           final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();
           final var previousTimestamp = timestampState.value();
           if (previousTimestamp != null) {
               ctx.timerService().deleteProcessingTimeTimer(previousTimestamp);
           }
           ctx.timerService().registerProcessingTimeTimer(invokeTimerMillis);
           timestampState.update(invokeTimerMillis);
       } else if (STOP.equals(status)) {
           // remove timer
           final var invokeTimerMillis = timestampState.value();
           if (invokeTimerMillis != null) {
               ctx.timerService().deleteProcessingTimeTimer(invokeTimerMillis);
               timestampState.clear();
           }
           alertState.clear();
       } else {
           log.debug("Unknown trigger status {} for key {}", status, ctx.getCurrentKey());
       }
   }

   @Override
   public void onTimer(long timestamp,
                       KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx,
                       Collector<AlertMessage> out) throws Exception {
       final var alertStateValue = alertState.value();
       if (alertStateValue != null) {
           final var alertMessage = stateToMessageTransformer.transform(alertStateValue);
           out.collect(alertMessage);
       }
       timestampState.clear();
       alertState.clear();
   }

   public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> {
       private static final long serialVersionUID = 1L;

       @Override
       public String getKey(TriggerMessage message) {
           return message.getUserId() + message.getTriggerName();
       }
   }
}

Я подробно объяснил его в прошлой статье. Напомню сам алгоритм, который будем тестировать:

  1. Получаем входное событие TriggerMessage.

  2. В разрезе ключа (trigger_name + user_id) выполняем действия:

    a) если статус TriggerMessage равен START, то создаем таймер на 10 минут, по его завершении генерируем AlertMessage на основе прошедшего события START;

    b) если status TriggerMessage равен STOP, то удаляем созданный в пункте выше таймер. Если такого сообщения еще не было (в разрезе ключа), то ничего страшного.

Тестировать этот оператор можно либо с помощью TestHarness от разработчиков Flink, либо поднятием Flink MiniCluster. Попробуем написать одинаковые тесты на оба варианта.

Тестирование оператора с таймерами с помощью TestHarness-классов

С классами TestHarness мы уже познакомились в статье Как провести unit-тестирование Flink-операторов: TestHarness. Они позволяют тестировать операторы без каких-то дополнительных Flink-зависимостей или окружения. Нужно только сделать настройку соответствующего TestHarness-класса, а также проэмулировать процесс обработки событий.

Начнем с инициализации TestHarness-класса. В его качестве выбираем KeyedOneInputStreamOperatorTestHarness, так как наш оператор является обработчиком стрима KeyedStream. Для этого мы предварительно использовали .keyBy-метод в потоке, а наш оператор имеет внутреннее KeyedState-состояние. Инициализация выглядит следующим образом:

class TriggerAlertProcessorUnitTest_byTestHarness {
   private final Duration stateWaiting = Duration.ofMillis(10);
   private KeyedOneInputStreamOperatorTestHarness<String, TriggerMessage, AlertMessage> testHarness;

   @BeforeEach
   @SneakyThrows
   void init() {
       final var processor = new TriggerAlertProcessor(stateWaiting);
       testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
               new KeyedProcessOperator<>(processor),
               new TriggerAlertProcessor.TriggerAlertProcessorKeySelector(),
               Types.STRING
       );
       testHarness.open();
   }

   @AfterEach
   @SneakyThrows
   void clear() {
       testHarness.close();
   }
// ...
}

В самом начале задаем stateWaiting — время ожидания таймера перед срабатыванием. Оно может быть любым. В блоке BeforeEach создаем объект тестируемого оператора TriggerAlertProcessor c инициализацией KeyedOneInputStreamOperatorTestHarness. В конструктор передаем:

  • наш оператор, обернутый в специальный Flink-класс KeyedProcessOperator;

  • объект, на основании которого происходит выбор ключа события в нашем KeyedStream-стриме;

  • тип этого ключа.

Дополнительно можно передать и другие настройки, например параллелизм. Обязательно вызываем метод open для инициализации внутренних состояний и подготовки к использованию в тестах. Внутри AfterEach закрываем все ресурсы.

Теперь мы перейдем к самим тестам. Для начала стоит проверить, что таймеры вообще срабатывают и метод onTimer генерирует событие в выходной поток. Для этого закинем на вход нашего оператора три START-события с разными userId и повторяющимся triggerName:

private static Stream<Arguments> provideStartEvents() {
   return Stream.of(
           arguments(List.of(
                   new StreamRecord<>(aTriggerMessage().withUserId(UUID.randomUUID()).withStatus(START).withTriggerName("trigger_1").build()),
                   new StreamRecord<>(aTriggerMessage().withUserId(UUID.randomUUID()).withStatus(START).withTriggerName("trigger_1").build()),
                   new StreamRecord<>(aTriggerMessage().withUserId(UUID.randomUUID()).withStatus(START).withTriggerName("trigger_2").build())
           ))
   );
}

В этом примере используется тестовый статический метод aTriggerMessage(), который позволяет сделать валидный TriggerMessage с заполненными полями. Мы ждем, что на каждое из них таймер создастся и успешно отработает, сгенерировав событие в выходной поток. Давайте это проверим тестом:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldProcessStartEventsWhenTimerInvoked(List<StreamRecord<TriggerMessage>> events) {
   testHarness.processElements(events);
   testHarness.setProcessingTime(stateWaiting.toMillis() + 1);

   final var outputEvents = testHarness.extractOutputStreamRecords();
   assertEquals(events.size(), outputEvents.size(), "Unexpected size of output result list");
}

Первым шагом забрасываем в testHarness все три события на обработку. Внутреннее время processTime-машины не изменяется, потому что для testHarness мы должны вызвать специальные методы управления временем. В нашем случае через setProcessTime указываем интервал больше, чем длительность заданного таймера. Затем оператор должен автоматически внутри среагировать на таймеры и вызвать методы onTimer на каждый из них. Для этого в тесте проверяем, что в результате testHarness.extractOutputStreamRecord() нам сгенерировалось столько же событий, сколько поступило на вход.

По такой логике получается, что событий в результате не будет. Если мы не укажем тесту о прошествии времени с помощью setProcessTime, то таймаут не пройдет и метод onTimer оператора не вызовется. Давайте это проверим:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldNotProcessStartEventsWhenTimerNotInvoked(List<StreamRecord<TriggerMessage>> events) {
   testHarness.processElements(events);

   final var outputEvents = testHarness.extractOutputStreamRecords();
   assertEquals(0, outputEvents.size(), "Unexpected size of output result list");
}

Действительно, этот тест успешно отрабатывает.

В конце воспроизведем ситуацию, когда приходит сначала START-событие, а затем STOP с таким же ключом (trigger_name + user_id). В этом случае генерируем входные события следующим образом:

private static Stream<Arguments> provideStartStopEvents() {
   final var userId = UUID.randomUUID();
   return Stream.of(
           arguments(List.of(
                   new StreamRecord<>(aTriggerMessage().withUserId(userId).withStatus(START).withTriggerName("trigger_3").build()),
                   new StreamRecord<>(aTriggerMessage().withUserId(userId).withStatus(STOP).withTriggerName("trigger_3").build())
           ))
   );
}

Тест оставим точно таким же, как первый. Добавим только ожидание отсутствия событий на выходе за время таймера:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartStopEvents")
void shouldNotProcessEventsWhenStartWithStop(List<StreamRecord<TriggerMessage>> events) {
   testHarness.processElements(events);
   testHarness.setProcessingTime(stateWaiting.toMillis() + 1);

   final var outputEvents = testHarness.extractOutputStreamRecords();
   assertEquals(0, outputEvents.size(), "Unexpected size of output result list");
}

В целом, можно написать еще много разных тестов, но мы пока остановимся. Этого уже достаточно для демонстрации возможности тестирования таймеров с помощью TestHarness. Теперь посмотрим, как выглядят точно такие же тесты через поднятие Flink MiniCluster.

Тестирование оператора с таймерами с помощью Flink MiniCluster

В статье Unit-тестирование Flink-операторов, Job: Flink MiniCluster я рассказывал, как сделать удобную аннотацию над тестом, чтобы автоматически поднялся Flink MiniCluster:

@Retention(RUNTIME)
@Inherited
@ExtendWith({FlinkClusterExtension.class})
public @interface WithFlinkCluster {
}

Такая аннотация использует механизм Junit Extension и полностью подходит для наших тестов. Их структура должна выглядеть так:

  1. Создаем Source из входных элементов.

  2. Создаем тестовый Sink, в котором будем проверять наличие выходных событий.

  3. Создаем минимальный пайплайн с нашим тестируемым оператором.

  4. Запускаем джобу и ждем установленное таймером время.

  5. Проверяем Sink на наличие выходных событий.

На первый взгляд тест на срабатывание таймеров будет таким:

@WithFlinkCluster
class TriggerAlertProcessorUnitTest_byFlinkCluster {
   @SneakyThrows
   @ParameterizedTest
   @MethodSource("provideStartEvents")
   void shouldProcessStartEventsWhenTimerInvoked_v1(List<TriggerMessage> events) {
       final var env = StreamExecutionEnvironment.getExecutionEnvironment();
       final var sink = new TestListSink<AlertMessage>();

       env.fromCollection(events)
           .keyBy(new TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofMillis(15)))
           .sinkTo(sink);
       env.execute();
       await().atMost(ofSeconds(2))
           .until(() -> sink.getHandledEvents().size() == events.size());
   }

   // ...
}

В начале теста получаем env, а затем создаем тестовый TestListSink. Я подробно рассказывал о нем в предыдущей статье про Unit-тестирование Flink-операторов. Нам будет очень важна его структура. Основная цель этого Sink — отловить все события на выходе с учетом любой заданной степени параллельности Sink-оператора. Далее создаем пайплайн, указав через .fromCollection входные события, потом используем наш оператор и таймер в 15 мс. В конце проверяем, что на выходе в блоке await получили все события.

В этом тесте много ошибок. Давайте найдем их. По аналогии с использованием TestHarness запускаем джобу и ожидаем на выходе три события для каждого входного сообщения. По факту на моей машине тест падает:

Assert не выполняется за две секунды: мы не получаем на выходе столько же событий, сколько было на входе. Что-то не так. Попробуем встать дебагом внутри нашего оператора в местах, где создается таймер и вызывается метод onTimer(). Предварительно увеличиваем время ожидания выполнения assert в Awaitility с 2 с до 60 с:

Дебаг трижды останавливается в методе onTimer и без ошибок записывает события в выходной поток. А тест на удивление проходит успешно:

Рассмотрим проблемы более подробно.

Источники BOUNDED и CONTINUOUS_UNBOUNDED 

Первая проблема заключается в том, что при обычном запуске после обработки всех входных событий каждым оператором джоба завершается сразу, без учета наличия внутренних таймеров.

Это происходит из-за наличия двух типов источников: BOUNDED — источник с конечным числом событий и CONTINUOUS_UNBOUNDED — источник с бесконечным числом записей. Создание потока через метод fromCollection как раз создает BOUNDED-поток, а таймеры просто не успевают сработать. Поэтому нам нужно как-то подождать, прежде чем поток завершится. Это можно сделать несколькими способами, но я предлагаю создать свой собственный тестовый универсальный UNBOUNDED-источник:

@SuppressWarnings("PMD.TestClassWithoutTestCases")
public final class TestUnboundedSource<T> implements SourceFunction<T> {
   private static final long serialVersionUID = 1L;
   private final int elementsListId;

   private volatile boolean running = true;

   @SuppressWarnings("unchecked")
   public static <E> TestUnboundedSource<E> fromElements(List<E> elements) {
       final var instanceListId = TestListWrapper.getInstance().createList();
       final var list = (List<E>) TestListWrapper.getInstance().getList(instanceListId);
       list.addAll(elements);
       return new TestUnboundedSource<>(instanceListId);
   }

   private TestUnboundedSource(int elementsListId) {
       this.elementsListId = elementsListId;
   }

   @Override
   @SuppressWarnings("unchecked")
   public void run(SourceContext<T> ctx) throws Exception {
       final var elements = (List<T>) TestListWrapper.getInstance().getList(elementsListId);
       for (T element : elements) {
           ctx.collect(element);
       }
       while (running) {
           // CONTINUOUS_UNBOUNDED
           Thread.sleep(500L);
       }
   }

   @Override
   public void cancel() {
       running = false;
   }
}

Такой источник можно создать с помощью статического метода TestUnboundedSource.fromElements, передав коллекцию любых элементов. Внутри него благодаря внутреннему Collections.synchronizedList создается потокобезопасное хранилище событий через TestListWrapper. Я уже рассказывал про достоинства этого класса в этой статье. Он нужен для защиты от проблем при увеличении параллелизма источника. Это очень важно, так как мы могли бы в тестах использовать дефолтный параллелизм больше 1. Тогда, в отличие от нашего TestUnboundedSource, источник дублировал бы сообщения. Можно использовать статические коллекции, как предложено в документации, но это не очень удобно.

TestUnboundedSource должен обязательно имплементировать метод run интерфейса SourceFunction, который в нашем случае перебирает все элементы входной коллекции и отправляет их в поток данных. Сама суть «бесконечного» потока достигается благодаря циклу внутри метода run. Реализация метода close как раз останавливает источник, потому что мы обязаны следовать соглашению из javadoc. Так для метода cancel упоминается:

Таким образом в тесте можно заменить env.fromCollection(events) на создание источника TestUnboundedSource.fromElements(events):

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldProcessStartEventsWhenTimerInvoked_v2(List<TriggerMessage> events) {
   final var env = StreamExecutionEnvironment.getExecutionEnvironment();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(events);

   env.addSource(source, TypeInformation.of(TriggerMessage.class))
           .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofMillis(15)))
           .sinkTo(sink);
   env.execute();

   await().atMost(ofSeconds(2))
           .until(() -> sink.getHandledEvents().size() == events.size());
}

Запускаем исправленный тест и видим, что он завис:

Это было ожидаемо. Мы создали бесконечный источник, а метод env.execute() является синхронным, из-за чего метод close мы вызвать из текущего потока не можем.

Асинхронный запуск Flink-задания

Для решения проблемы воспользуемся методом env.executeAsync(), который запускает Flink Job асинхронно. Не забываем при этом добавить закрытие ресурсов последней строкой: для синхронного запуска метод execute возвращал JobExecutionResult, но из метода executeAsync возвращается объект JobClient:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldProcessStartEventsWhenTimerInvoked_v3(List<TriggerMessage> events) {
   final var env = StreamExecutionEnvironment.getExecutionEnvironment();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(events);

   env.addSource(source, TypeInformation.of(TriggerMessage.class))
           .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofMillis(15)))
           .sinkTo(sink);
   final var jobClient = env.executeAsync();

   await().atMost(ofSeconds(2))
           .until(() -> sink.getHandledEvents().size() == events.size());
   jobClient.cancel().get(2, TimeUnit.SECONDS);
}

Запускаем тест, он проходит:

Радоваться не спешим. Здесь есть еще одна скрытая проблема. Давайте представим, что assert не выполнился. В этом случае ресурсы не закроются и метод jobClient.cancel() никто не вызовет. Поэтому во время запуска последующих тестов при использовании однократно поднятого Flink MiniCluster текущая джоба все еще будет выполняться, а на новый тест просто не хватит ресурсов поднятого кластера.

Безопасное завершение асинхронной Flink-задачи

Об этой проблеме я уже писал в статье E2E-тестирование Flink Job с Kafka. Ее можно решить собственным декоратором AutoCloseableJobClient над JobClient, который можно обернуть в try/finally. Правда, с помощью Lombok тест выглядит элегантнее:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldProcessStartEventsWhenTimerInvoked(List<TriggerMessage> events) {
   final var env = StreamExecutionEnvironment.getExecutionEnvironment();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(events);

   env.addSource(source, TypeInformation.of(TriggerMessage.class))
           .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofMillis(15)))
           .sinkTo(sink);
   @Cleanup final var jobClient = new AutoCloseableJobClient(env.executeAsync());

   await().atMost(ofSeconds(2))
           .until(() -> sink.getHandledEvents().size() == events.size());
}

В этом случае Lombok сам обернет jobClient-объект в try/finally-блок и всегда вызывает метод close после теста, независимо от выполнения или падения assert.

Используя такой прием, добавим новый тест на отсутствие срабатывания таймера, просто указав длительность таймаута, например, в 1 день:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldNotProcessStartEventsWhenTimerNotInvoked(List<TriggerMessage> events) {
   final var env = StreamExecutionEnvironment.getExecutionEnvironment();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(events);

   env.addSource(source, TypeInformation.of(TriggerMessage.class))
           .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofDays(1)))
           .sinkTo(sink);
   @Cleanup final var jobClient = new AutoCloseableJobClient(env.executeAsync());

   await().during(ofSeconds(2))
           .until(() -> sink.getHandledEvents().isEmpty());
}

Аналогичным образом пишется тест на отсутствие срабатывания таймера при приходе событий START и STOP в рамках того же ключа.

Тестирование Flink Job, содержащего оператор с таймерами

Следующим этапом тестирования нужно добавить полноценные тесты на всю джобу Flink, которая реализована в предыдущей статье согласно ТЗ. Напомню, как она выглядит:

@Component
@AllArgsConstructor
@ConditionalOnProperty("jobs.trigger-to-alert-job.enabled")
public class TriggerToAlertJob extends FlinkJob {
   private final TriggerToAlertJobProperties properties;
   private final SourceBinder<TriggerMessage> sourceBinder;
   private final SinkProvider<AlertMessage> sinkProvider;

   @Override
   public void registerJob(StreamExecutionEnvironment env) {
       sourceBinder.bindSource(env)
               .filter(new TriggerMessageByStatusAndUserFilter())
               .uid("filter_trigger_message_by_status_id").name("filter_trigger_message_by_status")
               .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
               .process(new TriggerAlertProcessor(properties.getStateWaiting()))
               .uid("trigger_alert_processor_id").name("trigger_alert_processor")
               .sinkTo(sinkProvider.createSink()).uid("sink_alert_message_id").name("sink_alert_message");
   }
}

Для теста воспользуемся аннотацией @FlinkJobTest, которую мы создавали для тестирования Flink-заданий в предыдущих статьях. Она позволяет создать или использовать существующий Flink MiniCluster, а также загружает необходимый Spring-контекст:

@Retention(RUNTIME)
@SpringBootTest(
   webEnvironment = NONE,
   classes = {
       PropertiesConfig.class,
       FlinkConfig.class,
   })
@ActiveProfiles({"test"})
@WithFlinkCluster
public @interface FlinkJobTest {
}

В новом TriggerToAlertJobUnitTest нам нужно проверить два основных кейса:

  1. Генерация выходного события таймером при получении START-события.

  2. Отсутствие выходного события, если на вход поступили события START и STOP.

Аналогично тестам выше нужно оперировать изменением длительности нашего таймера. В первом случае можно выставить 1 мс, чтобы не увеличивать время ожидания выходного события в тесте. Во втором — 2 с, чтобы гарантировать успешную обработку оператором обоих сообщений.

Так как длительность таймера задается через конфигурацию в application.yml, то эти два теста удобно разделить внутри одного тестового класса с помощью Junit-аннотации @Nested, передав дополнительно соответствующие значения таймеров в Spring-аннотацию @TestPropertySource. В итоге тесты будут выглядеть следующим образом:

@FlinkJobTest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
class TriggerToAlertJobUnitTest {
   @Autowired
   private StreamExecutionEnvironment environment;

   @Nested
   @TestPropertySource(properties = "jobs.trigger-to-alert-job.state-waiting=1ms")
   public class TriggerToAlertJobUnitTest_minWaitingTime {
       @Autowired
       private TriggerToAlertJobProperties properties;

       @Test
       @SneakyThrows
       void shouldCreateAlertMessageByStartTriggerMessage() {
           // ...
       }
   }

   @Nested
   @TestPropertySource(properties = "jobs.trigger-to-alert-job.state-waiting=1s")
   public class TriggerToAlertJobUnitTest_longWaitingTime {
       @Autowired
       private TriggerToAlertJobProperties properties;

       @Test
       @SneakyThrows
       void shouldNotCreateAlertMessageByStartWithStopTriggerMessage() {
           // ...
       }
   }
}

Первый тест для генерации выходного события таймером при получении START-события выглядит следующим образом:

@Test
@SneakyThrows
void shouldCreateAlertMessageByStartTriggerMessage() {
   final var triggerMessage = aTriggerMessage().withStatus(START).build();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(List.of(triggerMessage));
   final var job = new TriggerToAlertJob(
           properties,
           env -> env.addSource(source, TypeInformation.of(TriggerMessage.class)).uid("source"),
           () -> sink
   );

   job.registerJob(environment);
   @Cleanup final var jobClient = new AutoCloseableJobClient(environment.executeAsync());

   await().atMost(ofSeconds(2))
           .until(() -> !sink.getHandledEvents().isEmpty());
   final var out = sink.getHandledEvents();
   assertEquals(1, out.size(), format("Unexpected message count in sink: %s", out));
   final var alertMessage = out.get(0);
   assertEquals(triggerMessage.getTriggerName(), alertMessage.getTriggerName(), "Unexpected trigger name");
   assertEquals(triggerMessage.getUserId(), alertMessage.getUserId(), "Unexpected user id");
   assertEquals(triggerMessage.getTimestamp(), alertMessage.getTimestamp(), "Unexpected timestamp");
}

Содержимое достаточно простое: создаем тестовые Source и Sink аналогично примерам выше, но уже используем реальную джобу TriggerToAlertJob с передачей в конструктор в Spring-конфигурации задания. После асинхронного запуска проверяем, что на выходе получили AlertMessage с ожидаемыми значениями полей.

Второй тест для отсутствия выходного события при поступлении на вход двух событий START и STOP выглядит так:

@Test
@SneakyThrows
void shouldNotCreateAlertMessageByStartWithStopTriggerMessage() {
   final var userId = UUID.randomUUID();
   final var startTriggerMessage = aTriggerMessage().withStatus(START).withUserId(userId).build();
   final var stopTriggerMessage = aTriggerMessage().withStatus(STOP).withUserId(userId).build();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(List.of(startTriggerMessage, stopTriggerMessage));
   final var job = new TriggerToAlertJob(
           properties,
           env -> {
               env.setParallelism(1);
               return env.addSource(source, TypeInformation.of(TriggerMessage.class)).uid("source");
           },
           () -> sink
   );

   job.registerJob(environment);
   @Cleanup final var jobClient = new AutoCloseableJobClient(environment.executeAsync());

   await().during(ofSeconds(2))
           .until(() -> {
               final var userId1 = userId;
               return sink.getHandledEvents().isEmpty();
           });
}

Тест похож на предыдущие, но у него есть очень важное отличие в строке с env.setParallelism(1). Это нужно, чтобы сообщения START и STOP обработались в правильном порядке. Иначе STOP может обработаться раньше. Вы даже можете поймать такой кейс при нескольких перезапусках данного теста с установкой параллелизма >1. Тут можно возразить, что у нас же есть KeySelector, который в любом случае отправит оба события по одинаковому ключу на один и тот же оператор. Но не забывайте, что в джобе на первом месте стоит оператор-фильтр, у которого параллельность >1 по умолчанию. Поэтому два события могут разделиться на разные параллельные задания (Task) фильтра и изменить порядок уже перед оператором с состоянием. Такой сценарий также вполне возможен на проде — и это нужно иметь в виду.

Чтобы убедиться в работоспособности всей джобы, нам потребуется добавить финальный E2E-тест.

E2E-тестирование Flink Job, содержащего оператор с таймерами

E2E-тест должен быть похож на предыдущий Unit-тест всей джобы, но в качестве источника и приемника нужно использовать реальные компоненты — топики Kafka. В статье E2E-тестирование Flink Job с Kafka был пример E2E-теста для другого Flink-задания:

@E2ETest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class JobE2ETest {
   @Autowired
   private JobStarter jobStarter;
   @Autowired
   private TestKafkaFacade kafka;
   @Autowired
   private KafkaProperties kafkaProperties;

   @Test
   @SneakyThrows
   void shouldProcessClickMessageSourceToProductSink() {
       // ...
   }
}

Здесь аннотация @E2ETest заставляла подняться весь контекст Spring, а также Kafka и Flink MiniCluster. Также при этом использовалась конфигурация Flink, эмулирующая prod-окружение (использовался Embedded RockDB). Топики и взаимодействие с Kafka были реализованы с помощью нашей вспомогательной абстракции TestKafkaFacade, которую подробно рассмотрели ранее.

Для текущей задачи можно добавить такой тест:

@Test
@SneakyThrows
void shouldProcessTriggerMessageSourceToAlertSink() {
   final var triggerMessage = aTriggerMessage().withStatus(START).build();
   kafka.sendMessage(kafkaProperties.getTopics().getTriggerTopic(), triggerMessage);

   @Cleanup final var jobClient = jobStarter.startJobs();

   final var alertTopic = kafkaProperties.getTopics().getAlertTopic();
   @Cleanup final var kafkaConsumer =
           kafka.createKafkaConsumer(Set.of(alertTopic));
   await().atMost(ofSeconds(5))
           .until(() -> kafkaConsumer.receiveAndGetAll(alertTopic, AlertMessage.class),
                   alertMessages -> alertMessages.size() == 1
                           && alertMessages.get(0).getUserId().equals(triggerMessage.getUserId())
           );
}

В нем START-сообщение отправляется в trigger-топик, а на выходе проверяется, что получили AlertMessage-сообщение в ожидаемом топике alert-topic, заданном в конфигурации application-test.yml. Это происходит, потому что поднимается контекст весь, вместе с Source и Sink, которые подсоединяются к Kafka. Также в конфигурации необходимо выставить настройку длительности таймера, например, в 10 мс в отличие от стандартного 10 мин.

Вывод

В данной статье я показал, как можно тестировать операторы с таймерами с помощью TestHarness-классов и поднятием базового Flink MiniCluster. Мы рассмотрели несколько проблем и неочевидных подводных камней при тестировании. В заключении написали полноценный Unit-тест на всю Flink-джобу, а также финальный E2E-тест, проверяющий интеграцию с Kafka в окружении, близком к проду.

В следующей статье рассмотрим проблему, когда схема нашего состояния KeyedState может измениться, а потерять данные при обновлении приложения и восстановлении состояния при старте новой версии нельзя. Подниму тему сериализации данных и напишу свой сериализатор состояния, поддерживающий эволюцию схемы. И конечно, буду тестировать случаи, когда в однократно запущенном распределенном Flink-кластере есть несколько разных версий одного и того же класса.

Комментарии (1)


  1. Marsezi
    25.10.2024 12:24

    Возможно вопрос более подходит к вашей предыдущий стать. Вот на примере лайков. Flink как вы пишете работает без batch в реальном времени и т д.

    Считаем лайки (суммируем по статьям). Внутри flink они инкрементируются обрабатывая сообщение за сообщением. И есть sink в базу например mongo. И как они в базу пишутся ? Также на каждый лайк происходит update в базу данных? Это же создаст туже нагрузку что я просто буду писать напрямую в базу update like++. Посмотрел параметры в настройках sink для mongo , там есть параметры таймаутов, размеров буферов и т д. Но ничего связанного с решением описанной проблемы.

    Так вот мне flink не подходит и мне нужно переходить как раз на batch по типу spark?

    Или это какие то сложные ручные Processorы надо писать чтобы они делали flush в бд с условиями 'или изменения за 1 минуту или накопили 1000 записей на update'

    0