Привет всем, на связи снова Александр Бобряков, техлид в команде МТС Аналитики. Продолжаем цикл статей про фреймворк Apache Flink.

Напомню, в предыдущих частях я рассказывал про построение пайплайна Kafka-to-Kafka с промежуточным разделением потока и дедупликацией событий. Также в предыдущей статье я рассказал, как можно динамически определить выходной Kafka-топик для каждого отправляемого события.

Начиная с этой статьи начнём разбирать, как тестировать всё наше приложение Flink + Spring. Многие описанные подходы вполне применимы и в любом другом обычном Spring-приложении, поэтому, надеюсь, вы найдёте для себя что-то новое.

В данной статье мы рассмотрим, как протестировать stateless- и stateful-операторы Flink с помощью абстракций TestHarness.

Список моих постов про Flink

Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии статей. Данная статья соответствует релизной ветке с названием release/4_testharness_deduplicator_test.

Оглавление статьи

Структура тестов

Небольшой спойлер: к концу статьи мы получим вот такую структуру тестов:

Мы добавим первые тесты на stateless-фильтр, stateful-дедупликатор и на stateless-сплитератор (учитывающий внутренние Flink-абстракции).

Абстракция EntityTestBuilder

В нашем приложении уже есть несколько бизнес-DTO, например ClickMessage и ProductMessage, и будут добавляться новые. В самих тестах мы должны их создавать для воспроизведения отдельных сценариев. Но подумаем об удобстве: определим единое место, в котором будем создавать каждое DTO. Ведь в сценариях много полей, а определённые значения могут иметь бизнес-ценность.

Для этого создадим в тестовой директории следующий интерфейс:

public interface EntityTestBuilder<T> {
    T build();
}

Для тестов у него также будут реализации под каждую нашу DTO. Давайте рассмотрим пример для ClickMessage:

@With
@AllArgsConstructor(access = PUBLIC)
@NoArgsConstructor(access = PRIVATE)
public class ClickMessageTestBuilder implements EntityTestBuilder<ClickMessage> {
   private UUID userId = UUID.randomUUID();
   private String object = "test_object";
   private Platform platform = Platform.Enum.APP;
   private String productName = "test_productName";
   private String productTopic = "test_productTopic";
   private Long timestamp = 123L;
   private Map<String, Object> data = new HashMap<>() {{
       put("field_1", "value_1");
       put("field_2", "value_2");
   }};

   public static ClickMessageTestBuilder aClickMessage() {
       return new ClickMessageTestBuilder();
   }

   @Override
   public ClickMessage build() {
       return ClickMessage.builder()
                  .userId(userId)
                  .object(object)
                  .platform(platform)
                  .productName(productName)
                  .productTopic(productTopic)
                  .timestamp(timestamp)
                  .data(data)
                  .build();
   }
}

В этой реализации мы в одном месте определяем все дефолтные значения полей, а также статический метод-билдер, который будем вызывать в тестах для построения объекта ClickMessage. При этом аннотации lombok позволяют переопределять дефолтные значения с помощью методов withXXX(...), например:

aClickMessage().withPlatform(APP).build()

Тестирование stateless Flink-операторов

Начнём с самых простых тестов на stateless-операторы. Напомню, что они не используют внутренние состояния. Им неважно, какие события они обрабатывали ранее — каждое событие полностью независимо от других. Такой оператор выглядит как обычный Java-класс, и в его тестировании нет ничего сложного.

Давайте рассмотрим наш фильтр, пропускающий события с определённым значением поля platform:

public class ClickMessageWithPlatformFilter implements FilterFunction<ClickMessage> {
   private static final long serialVersionUID = 1L;

   @Override
   public boolean filter(ClickMessage message) {
       return Optional.ofNullable(message.getPlatform())
                  .map(platform -> WEB.equals(platform) || APP.equals(platform))
                  .orElse(false);
   }
}

Протестировать его можно с помощью JUnit 5 с использованием аннотации @ParameterizedTest. Эта аннотация позволяет указывать тесту аргументы, которые поступают посредством второй Source-аннотации (@MethodSource, @ValueSource, @CsvSource...):

class ClickMessageWithPlatformFilterUnitTest {

   @ParameterizedTest
   @MethodSource("clickMessagesByPlatform")
   void shouldFilterMessageByPlatform(boolean expectedFilter, ClickMessage message) {
       final var filter = new ClickMessageWithPlatformFilter();
       final var filterResult = filter.filter(message);
       assertEquals(expectedFilter, filterResult, format("Unexpected filter result for message: %s", message));
   }

   private static Stream<Arguments> clickMessagesByPlatform() {
       return Stream.of(
           arguments(true, aClickMessage().withPlatform(APP).build()),
           arguments(true, aClickMessage().withPlatform(WEB).build()),
           arguments(false, aClickMessage().withPlatform(Platform.of("unknown")).build()),
           arguments(false, aClickMessage().withPlatform(Platform.of("")).build()),
           arguments(false, aClickMessage().withPlatform(null).build())
       );
   }
}
  • в статическом методе clickMessagesByPlatform определяем перечень всех комбинаций входных параметров для теста .boolean expectedFilter — ожидаемый результат вызова фильтра true/false

  • ClickMessage message — само сообщение для фильтрации (обратите внимание на удобство создания объектов этого класса)

Тест будет запускаться 5 раз. В самом тесте создаём объект фильтра и вызываем метод filter для дальнейшего сравнения с ожидаемым результатом.

Так мы можем протестировать классы для методов map, flatMap, process и т. д. Конечно, иногда может понадобиться замокать какой-нибудь объект с помощью Mockito, но это ничем не отличается от стандартного тестирования.

Тестирование stateful Flink-операторов

Теперь перейдём к тестированию stateful-операторов. Они тесно связаны со средой выполнения Flink, потому что могут содержать внутреннее состояние, управляемое Flink, а также какие-нибудь таймеры и т. д. В этом случае подход, который я описал выше, не подойдёт. На этот раз нам нужны более умные абстракции, которые могут выполнить логику управления Flink. Такие абстракции есть в документации к тестированию.

Также в документации предлагается использовать так называемые TestHarness-классы. Для этого мы настроим дополнительные зависимости:

testImplementation "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"

Первая зависимость даёт нам доступ ко всем описанным в документации TestHarness-классам, а вторая — ко многим удобным абстракциям в виде Flink MiniCluster, TestEnvironment и т. д.

Тестирование с помощью TestHarness-классов

TestHarness — это классы для тестирования операторов, основанные на использовании внутренней логики Flink: watermark, состояния, таймеры и т. д.

Сейчас объясню на примере обычного stateless-оператора. Рассмотрим тест над нашим написанным ранее сплитератором входных событий по нескольким побочным выходным потокам (по OutputTag) на основе предикатов:

@RequiredArgsConstructor
public class StreamSpliterator<I> extends ProcessFunction<I, Object> {
   private static final long serialVersionUID = 1L;

   private final Map<OutputTag<I>, SerializablePredicate<I>> predicatesByTags;
   private final OutputTag<I> defaultTag;

   @Override
   public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) {
       for (Map.Entry<OutputTag<I>, SerializablePredicate<I>> entry : predicatesByTags.entrySet()) {
           final var predicate = entry.getValue();
           if (predicate.test(value)) {
               final var tag = entry.getKey();
               ctx.output(tag, value);
               return;
           }
       }
       ctx.output(defaultTag, value);
   }
}

Подробное объяснение этого кода можно глянуть тут. Основная сложность тестирования — запись в объект класса ProcessFunction.<I, Object>Context по определённому тегу в строке ctx.output(tag, value). Можно, конечно, замокать этот класс и провести обычный unit-тест, но это выглядит не очень хорошо. Получается, что тест должен знать о внутренней структуре/логике тестируемого оператора. Давайте напишем более красивый тест с помощью вспомогательных TestHarness-абстракций:

class StreamSpliteratorUnitTest {

   @Test
   void shouldSplitStreamByPredicates() throws Exception {
       final var webTag = new OutputTag<>("web_tag", TypeInformation.of(String.class));
       final var mobileTag = new OutputTag<>("mobile_tag", TypeInformation.of(String.class));
       final var unknownTag = new OutputTag<>("unknown_tag", TypeInformation.of(String.class));
       final var events = List.of(
           new StreamRecord<>("web"),
           new StreamRecord<>("mobile"),
           new StreamRecord<>("test"),
           new StreamRecord<>("web")
       );
       final var spliterator = new StreamSpliterator<>(
           Map.of(
               webTag, "web"::equals,
               mobileTag, "mobile"::equals
           ),
           unknownTag
       );
       @Cleanup final var testHarness = ProcessFunctionTestHarnesses.forProcessFunction(spliterator);

       testHarness.processElements(events);

       assertTrue(testHarness.getRecordOutput().isEmpty(), "Output stream has unexpected events");
       assertEquals(2, testHarness.getSideOutput(webTag).size(),
           format("Unexpected events in 'web' output stream: %s", new ArrayList<>(testHarness.getSideOutput(webTag))));
       assertEquals(1, testHarness.getSideOutput(mobileTag).size(),
           format("Unexpected events in 'mobile' output stream: %s", new ArrayList<>(testHarness.getSideOutput(mobileTag))));
       assertEquals(1, testHarness.getSideOutput(unknownTag).size(),
           format("Unexpected events in 'default' output stream: %s", new ArrayList<>(testHarness.getSideOutput(unknownTag))));
   }
}

Для начала мы создаём три тега: web, mobile, unknown. По ним мы ожидаем получить разделение входного потока. Далее нам нужны тестовые события — events, которые будем передавать на вход сплитератора StreamSpliterator. Мы используем класс StreamRecord — это базовая абстракция в Flink. Она передаётся внутри стрима данных и содержит именно бизнес-объект. Этот класс — одна из реализаций абстрактного StreamElement. Например, другая реализация — класс Watermark. Он может также передаваться внутри потока данных отдельным событием для оповещения операторов. По сути в тесте мы создали четыре события с типом String. После этого идёт инициализация сплитератора new StreamSpliterator: события с определённым значением мы записываем в соответствующий побочный выход по тегу согласно указанному предикату (для простоты используем equals для входного события).

После этого вступают в действие TestHarness-абстракции. Можно использовать утилитный класс ProcessFunctionTestHarnesses, у которого множество статических методов для создания класса AbstractStreamOperatorTestHarness. Он, в свою очередь, позволяет выполнять множество действий над стримом, будто это делает сам Flink. Так как наш сплитератор реализует ProcessFunction, то пользуемся соответствующим методом forProcessFunction(...) утилитного класса ProcessFunctionTestHarnesses. Теперь у нас есть объект testHarnsess, который содержит множество полезных методов. Заметьте, он реализует AutoCloseable, поэтому мы ставим аннотацию ломбока @Cleanup для автоматического вызова close()-метода.

И ещё пара методов:

  • processElements() — для обработки всех входных событий

  • getRecordOutput — для получения выходного потока данных и проверки, что выходных элементов нет. Так как наш сплитератор должен закинуть все лишние сообщения, не попавшие под условия всех предикатов, в побочный выходной поток по умолчанию с тегом unknown. И вот как раз по этому тегу мы и проверяем выходные потоки через метод getSideOutput()

Подобная TestHarness-абстракция очень удобна, и вызов её методов может эмулировать обработку, будто это делает сам Flink-кластер.

Тестирование дедупликатора с помощью TestHarness-классов

Предыдущий тест был достаточно простым, а теперь давайте взглянем на тест полноценного stateful-оператора — нашего дедупликатора из предыдущих статей. Напомню его код:

public class Deduplicator<T> extends RichFlatMapFunction<T, T> {
   private static final long serialVersionUID = 1L;

   private final Time ttl;
   private ValueState<Boolean> keyHasBeenSeen;

   public Deduplicator(Time ttl) {
       this.ttl = notNull(ttl, "Deduplicator TTL is null");
   }

   @Override
   public void open(Configuration parameters) {
       final var descriptor = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
       final var ttlConfig = StateTtlConfig
                                 .newBuilder(ttl)
                                 .setUpdateType(UpdateType.OnCreateAndWrite)
                                 .setStateVisibility(NeverReturnExpired)
                                 .cleanupFullSnapshot()
                                 .build();
       descriptor.enableTimeToLive(ttlConfig);
       keyHasBeenSeen = getRuntimeContext().getState(descriptor);
   }

   @Override
   public void flatMap(T event, Collector<T> out) throws Exception {
       if (keyHasBeenSeen.value() == null) {
           out.collect(event);
           keyHasBeenSeen.update(true);
       }
   }
}

Более подробное объяснение содержимого было вот в этой статье. А сейчас мы сосредоточимся на самом тесте. Если вкратце, то у нас существует внутреннее состояние ValueState<Boolean>, которое содержит флаг true в случаях, когда сообщение встречается впервые. При повторном поступлении события с таким же ключом событие не пропускается дальше (дедуплицируется), так как в рамках установленного TTL мы уже встречали его ранее.

Теперь перейдём непосредственно к тесту. Рассмотрим его по частям:

class DeduplicatorUnitTest_byTestHarness {
   private final Time TTL = Time.milliseconds(10);
   private KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness;

   @BeforeEach
   @SneakyThrows
   void init() {
       final var deduplicator = new Deduplicator<String>(TTL);
       testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
           new StreamFlatMap<>(deduplicator),
           value -> value,
           Types.STRING
       );
       testHarness.open();
   }

   @AfterEach
   @SneakyThrows
   void clear() {
       testHarness.close();
   }
...
}

В самом начале мы задаём TTL = 10 мс. Также пользуемся аннотациями BeforeEach/AfterEach для инициализации TestHarness и закрытия ресурсов. На этот раз мы используем абстракцию KeyedOneInputStreamOperatorTestHarness, потому что дедупликатор работает в разрезе ключа события и требует перед собой вызов keyBy-метода в пайплайне. В конструктор передаём:

  1. Наш объект дедупликатора.

  2. Лямбду для определения ключа события (аналог keyBy-вызова в пайплайне).

  3. Тип события для помощи Flink.

Важный момент: вызов метода open, который инициализирует объект TestHarness с учётом его внутреннего состояния ValueState.

Дальнейший тест проверит, происходит ли дедупликация: отправляем на вход одни и те же события дважды, но на выходе событие должно быть в единственном экземпляре. Наш метод provideUniqueEvents (его код ниже) как раз будет генерировать два уникальных String-события с разными значениями для использования в ParametrizedTest:

private static Stream<Arguments> provideUniqueEvents() {
   return Stream.of(
       arguments(List.of(new StreamRecord<>("key_1"), new StreamRecord<>("key_2")))
   );
}

Тест при этом выглядит следующим образом:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideUniqueEvents")
void shouldDeduplicateMessagesByTtl(List<StreamRecord<String>> events) {
   testHarness.processElements(events);
   testHarness.setStateTtlProcessingTime(TTL.toMilliseconds() - 1);
   testHarness.processElements(events);

   final var outputEvents = testHarness.getOutput();
   assertEquals(events.size(), outputEvents.size(),
       format("Unexpected number of events after deduplication. Output events: %s", outputEvents));
   assertEquals(events, new ArrayList<>(outputEvents), "Unexpected events after deduplication");
}

В тесте необходимо обработать события первый раз через processElements, а далее использовать вспомогательный метод для эмулирования TTL. Вызов setStateTtlProcessingTime указывает оператору думать, будто прошло уже (TTL.toMilliseconds() – 1) времени после обработки предыдущих событий. Так как сам TTL больше на 1 мс, то сообщения должны дедуплицироваться. Для этого мы отправляем их на вход ещё раз и проверяем, что на выходе testHarness.getOutput() их не стало в два раза больше.

Теперь нам нужен тест, который проверит, что спустя TTL сообщения могут обрабатываться вновь:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideUniqueEvents")
void shouldNotDeduplicateBecauseOfTtlExpired(List<StreamRecord<String>> events) {
   testHarness.processElements(events);
   testHarness.setStateTtlProcessingTime(TTL.toMilliseconds() + 1);
   testHarness.processElements(events);

   final var outputEvents = testHarness.getOutput();
   assertEquals(events.size() * 2, outputEvents.size(),
       format("Unexpected number of events after deduplication. Output events: %s", outputEvents));
   final var expectedEvents = new ArrayList<>();
   expectedEvents.addAll(events);
   expectedEvents.addAll(events);
   assertEquals(expectedEvents, new ArrayList<>(outputEvents), "Unexpected events after expired ttl deduplication");
}

Тут всё то же самое, что и в предыдущем тесте, но мы устанавливаем setStateTtlProcessingTime в значение на 1 мс больше, чем TTL. Тогда информация, что  сообщения events встречались ранее, должна удалиться из внутреннего состояния ValueState<Boolean>, поэтому дедупликация не сработает. Как раз это и проверяем в блоке с assert — сообщений на выходе в два раза больше, т. к. отправили их дважды.

Вывод

Мы рассмотрели, как можно протестировать stateless-операторы и более сложные stateful-операторы с внутренним состоянием с использованием вспомогательных TestHarness-классов. Так можно и нужно провести unit-тестирование каждого нашего оператора отдельно, независимо от всех джоб. Но нам ещё нужна гарантия, что операторы будут корректно работать в реальном Flink-окружении и что весь пайплайн настроен и работает правильно. Поэтому я покажу, как можно написать тест на полноценную джобу с использованием Flink MiniCluster, в следующей статье. А ещё — как использовать подход мини-кластеров для тестирования отдельных stateful-операторов.

Комментарии (0)