Привет всем, на связи снова Александр Бобряков, техлид в команде МТС Аналитики. Продолжаем цикл статей про фреймворк Apache Flink.
Напомню, в предыдущих частях я рассказывал про построение пайплайна Kafka-to-Kafka с промежуточным разделением потока и дедупликацией событий. Также в предыдущей статье я рассказал, как можно динамически определить выходной Kafka-топик для каждого отправляемого события.
Начиная с этой статьи начнём разбирать, как тестировать всё наше приложение Flink + Spring. Многие описанные подходы вполне применимы и в любом другом обычном Spring-приложении, поэтому, надеюсь, вы найдёте для себя что-то новое.
В данной статье мы рассмотрим, как протестировать stateless- и stateful-операторы Flink с помощью абстракций TestHarness.
Список моих постов про Flink
Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии статей. Данная статья соответствует релизной ветке с названием release/4_testharness_deduplicator_test.
Оглавление статьи
Структура тестов
Небольшой спойлер: к концу статьи мы получим вот такую структуру тестов:
Мы добавим первые тесты на stateless-фильтр, stateful-дедупликатор и на stateless-сплитератор (учитывающий внутренние Flink-абстракции).
Абстракция EntityTestBuilder
В нашем приложении уже есть несколько бизнес-DTO, например ClickMessage и ProductMessage, и будут добавляться новые. В самих тестах мы должны их создавать для воспроизведения отдельных сценариев. Но подумаем об удобстве: определим единое место, в котором будем создавать каждое DTO. Ведь в сценариях много полей, а определённые значения могут иметь бизнес-ценность.
Для этого создадим в тестовой директории следующий интерфейс:
public interface EntityTestBuilder<T> {
T build();
}
Для тестов у него также будут реализации под каждую нашу DTO. Давайте рассмотрим пример для ClickMessage:
@With
@AllArgsConstructor(access = PUBLIC)
@NoArgsConstructor(access = PRIVATE)
public class ClickMessageTestBuilder implements EntityTestBuilder<ClickMessage> {
private UUID userId = UUID.randomUUID();
private String object = "test_object";
private Platform platform = Platform.Enum.APP;
private String productName = "test_productName";
private String productTopic = "test_productTopic";
private Long timestamp = 123L;
private Map<String, Object> data = new HashMap<>() {{
put("field_1", "value_1");
put("field_2", "value_2");
}};
public static ClickMessageTestBuilder aClickMessage() {
return new ClickMessageTestBuilder();
}
@Override
public ClickMessage build() {
return ClickMessage.builder()
.userId(userId)
.object(object)
.platform(platform)
.productName(productName)
.productTopic(productTopic)
.timestamp(timestamp)
.data(data)
.build();
}
}
В этой реализации мы в одном месте определяем все дефолтные значения полей, а также статический метод-билдер, который будем вызывать в тестах для построения объекта ClickMessage. При этом аннотации lombok позволяют переопределять дефолтные значения с помощью методов withXXX(...), например:
aClickMessage().withPlatform(APP).build()
Тестирование stateless Flink-операторов
Начнём с самых простых тестов на stateless-операторы. Напомню, что они не используют внутренние состояния. Им неважно, какие события они обрабатывали ранее — каждое событие полностью независимо от других. Такой оператор выглядит как обычный Java-класс, и в его тестировании нет ничего сложного.
Давайте рассмотрим наш фильтр, пропускающий события с определённым значением поля platform:
public class ClickMessageWithPlatformFilter implements FilterFunction<ClickMessage> {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(ClickMessage message) {
return Optional.ofNullable(message.getPlatform())
.map(platform -> WEB.equals(platform) || APP.equals(platform))
.orElse(false);
}
}
Протестировать его можно с помощью JUnit 5 с использованием аннотации @ParameterizedTest. Эта аннотация позволяет указывать тесту аргументы, которые поступают посредством второй Source-аннотации (@MethodSource, @ValueSource, @CsvSource...):
class ClickMessageWithPlatformFilterUnitTest {
@ParameterizedTest
@MethodSource("clickMessagesByPlatform")
void shouldFilterMessageByPlatform(boolean expectedFilter, ClickMessage message) {
final var filter = new ClickMessageWithPlatformFilter();
final var filterResult = filter.filter(message);
assertEquals(expectedFilter, filterResult, format("Unexpected filter result for message: %s", message));
}
private static Stream<Arguments> clickMessagesByPlatform() {
return Stream.of(
arguments(true, aClickMessage().withPlatform(APP).build()),
arguments(true, aClickMessage().withPlatform(WEB).build()),
arguments(false, aClickMessage().withPlatform(Platform.of("unknown")).build()),
arguments(false, aClickMessage().withPlatform(Platform.of("")).build()),
arguments(false, aClickMessage().withPlatform(null).build())
);
}
}
в статическом методе clickMessagesByPlatform определяем перечень всех комбинаций входных параметров для теста .boolean expectedFilter — ожидаемый результат вызова фильтра true/false
ClickMessage message — само сообщение для фильтрации (обратите внимание на удобство создания объектов этого класса)
Тест будет запускаться 5 раз. В самом тесте создаём объект фильтра и вызываем метод filter для дальнейшего сравнения с ожидаемым результатом.
Так мы можем протестировать классы для методов map, flatMap, process и т. д. Конечно, иногда может понадобиться замокать какой-нибудь объект с помощью Mockito, но это ничем не отличается от стандартного тестирования.
Тестирование stateful Flink-операторов
Теперь перейдём к тестированию stateful-операторов. Они тесно связаны со средой выполнения Flink, потому что могут содержать внутреннее состояние, управляемое Flink, а также какие-нибудь таймеры и т. д. В этом случае подход, который я описал выше, не подойдёт. На этот раз нам нужны более умные абстракции, которые могут выполнить логику управления Flink. Такие абстракции есть в документации к тестированию.
Также в документации предлагается использовать так называемые TestHarness-классы. Для этого мы настроим дополнительные зависимости:
testImplementation "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
Первая зависимость даёт нам доступ ко всем описанным в документации TestHarness-классам, а вторая — ко многим удобным абстракциям в виде Flink MiniCluster, TestEnvironment и т. д.
Тестирование с помощью TestHarness-классов
TestHarness — это классы для тестирования операторов, основанные на использовании внутренней логики Flink: watermark, состояния, таймеры и т. д.
Сейчас объясню на примере обычного stateless-оператора. Рассмотрим тест над нашим написанным ранее сплитератором входных событий по нескольким побочным выходным потокам (по OutputTag) на основе предикатов:
@RequiredArgsConstructor
public class StreamSpliterator<I> extends ProcessFunction<I, Object> {
private static final long serialVersionUID = 1L;
private final Map<OutputTag<I>, SerializablePredicate<I>> predicatesByTags;
private final OutputTag<I> defaultTag;
@Override
public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) {
for (Map.Entry<OutputTag<I>, SerializablePredicate<I>> entry : predicatesByTags.entrySet()) {
final var predicate = entry.getValue();
if (predicate.test(value)) {
final var tag = entry.getKey();
ctx.output(tag, value);
return;
}
}
ctx.output(defaultTag, value);
}
}
Подробное объяснение этого кода можно глянуть тут. Основная сложность тестирования — запись в объект класса ProcessFunction.<I, Object>Context по определённому тегу в строке ctx.output(tag, value). Можно, конечно, замокать этот класс и провести обычный unit-тест, но это выглядит не очень хорошо. Получается, что тест должен знать о внутренней структуре/логике тестируемого оператора. Давайте напишем более красивый тест с помощью вспомогательных TestHarness-абстракций:
class StreamSpliteratorUnitTest {
@Test
void shouldSplitStreamByPredicates() throws Exception {
final var webTag = new OutputTag<>("web_tag", TypeInformation.of(String.class));
final var mobileTag = new OutputTag<>("mobile_tag", TypeInformation.of(String.class));
final var unknownTag = new OutputTag<>("unknown_tag", TypeInformation.of(String.class));
final var events = List.of(
new StreamRecord<>("web"),
new StreamRecord<>("mobile"),
new StreamRecord<>("test"),
new StreamRecord<>("web")
);
final var spliterator = new StreamSpliterator<>(
Map.of(
webTag, "web"::equals,
mobileTag, "mobile"::equals
),
unknownTag
);
@Cleanup final var testHarness = ProcessFunctionTestHarnesses.forProcessFunction(spliterator);
testHarness.processElements(events);
assertTrue(testHarness.getRecordOutput().isEmpty(), "Output stream has unexpected events");
assertEquals(2, testHarness.getSideOutput(webTag).size(),
format("Unexpected events in 'web' output stream: %s", new ArrayList<>(testHarness.getSideOutput(webTag))));
assertEquals(1, testHarness.getSideOutput(mobileTag).size(),
format("Unexpected events in 'mobile' output stream: %s", new ArrayList<>(testHarness.getSideOutput(mobileTag))));
assertEquals(1, testHarness.getSideOutput(unknownTag).size(),
format("Unexpected events in 'default' output stream: %s", new ArrayList<>(testHarness.getSideOutput(unknownTag))));
}
}
Для начала мы создаём три тега: web, mobile, unknown. По ним мы ожидаем получить разделение входного потока. Далее нам нужны тестовые события — events, которые будем передавать на вход сплитератора StreamSpliterator. Мы используем класс StreamRecord — это базовая абстракция в Flink. Она передаётся внутри стрима данных и содержит именно бизнес-объект. Этот класс — одна из реализаций абстрактного StreamElement. Например, другая реализация — класс Watermark. Он может также передаваться внутри потока данных отдельным событием для оповещения операторов. По сути в тесте мы создали четыре события с типом String. После этого идёт инициализация сплитератора new StreamSpliterator: события с определённым значением мы записываем в соответствующий побочный выход по тегу согласно указанному предикату (для простоты используем equals для входного события).
После этого вступают в действие TestHarness-абстракции. Можно использовать утилитный класс ProcessFunctionTestHarnesses, у которого множество статических методов для создания класса AbstractStreamOperatorTestHarness. Он, в свою очередь, позволяет выполнять множество действий над стримом, будто это делает сам Flink. Так как наш сплитератор реализует ProcessFunction, то пользуемся соответствующим методом forProcessFunction(...) утилитного класса ProcessFunctionTestHarnesses. Теперь у нас есть объект testHarnsess, который содержит множество полезных методов. Заметьте, он реализует AutoCloseable, поэтому мы ставим аннотацию ломбока @Cleanup для автоматического вызова close()-метода.
И ещё пара методов:
processElements() — для обработки всех входных событий
getRecordOutput — для получения выходного потока данных и проверки, что выходных элементов нет. Так как наш сплитератор должен закинуть все лишние сообщения, не попавшие под условия всех предикатов, в побочный выходной поток по умолчанию с тегом unknown. И вот как раз по этому тегу мы и проверяем выходные потоки через метод getSideOutput()
Подобная TestHarness-абстракция очень удобна, и вызов её методов может эмулировать обработку, будто это делает сам Flink-кластер.
Тестирование дедупликатора с помощью TestHarness-классов
Предыдущий тест был достаточно простым, а теперь давайте взглянем на тест полноценного stateful-оператора — нашего дедупликатора из предыдущих статей. Напомню его код:
public class Deduplicator<T> extends RichFlatMapFunction<T, T> {
private static final long serialVersionUID = 1L;
private final Time ttl;
private ValueState<Boolean> keyHasBeenSeen;
public Deduplicator(Time ttl) {
this.ttl = notNull(ttl, "Deduplicator TTL is null");
}
@Override
public void open(Configuration parameters) {
final var descriptor = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
final var ttlConfig = StateTtlConfig
.newBuilder(ttl)
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(NeverReturnExpired)
.cleanupFullSnapshot()
.build();
descriptor.enableTimeToLive(ttlConfig);
keyHasBeenSeen = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(T event, Collector<T> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}
Более подробное объяснение содержимого было вот в этой статье. А сейчас мы сосредоточимся на самом тесте. Если вкратце, то у нас существует внутреннее состояние ValueState<Boolean>, которое содержит флаг true в случаях, когда сообщение встречается впервые. При повторном поступлении события с таким же ключом событие не пропускается дальше (дедуплицируется), так как в рамках установленного TTL мы уже встречали его ранее.
Теперь перейдём непосредственно к тесту. Рассмотрим его по частям:
class DeduplicatorUnitTest_byTestHarness {
private final Time TTL = Time.milliseconds(10);
private KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness;
@BeforeEach
@SneakyThrows
void init() {
final var deduplicator = new Deduplicator<String>(TTL);
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
new StreamFlatMap<>(deduplicator),
value -> value,
Types.STRING
);
testHarness.open();
}
@AfterEach
@SneakyThrows
void clear() {
testHarness.close();
}
...
}
В самом начале мы задаём TTL = 10 мс. Также пользуемся аннотациями BeforeEach/AfterEach для инициализации TestHarness и закрытия ресурсов. На этот раз мы используем абстракцию KeyedOneInputStreamOperatorTestHarness, потому что дедупликатор работает в разрезе ключа события и требует перед собой вызов keyBy-метода в пайплайне. В конструктор передаём:
Наш объект дедупликатора.
Лямбду для определения ключа события (аналог keyBy-вызова в пайплайне).
Тип события для помощи Flink.
Важный момент: вызов метода open, который инициализирует объект TestHarness с учётом его внутреннего состояния ValueState.
Дальнейший тест проверит, происходит ли дедупликация: отправляем на вход одни и те же события дважды, но на выходе событие должно быть в единственном экземпляре. Наш метод provideUniqueEvents (его код ниже) как раз будет генерировать два уникальных String-события с разными значениями для использования в ParametrizedTest:
private static Stream<Arguments> provideUniqueEvents() {
return Stream.of(
arguments(List.of(new StreamRecord<>("key_1"), new StreamRecord<>("key_2")))
);
}
Тест при этом выглядит следующим образом:
@SneakyThrows
@ParameterizedTest
@MethodSource("provideUniqueEvents")
void shouldDeduplicateMessagesByTtl(List<StreamRecord<String>> events) {
testHarness.processElements(events);
testHarness.setStateTtlProcessingTime(TTL.toMilliseconds() - 1);
testHarness.processElements(events);
final var outputEvents = testHarness.getOutput();
assertEquals(events.size(), outputEvents.size(),
format("Unexpected number of events after deduplication. Output events: %s", outputEvents));
assertEquals(events, new ArrayList<>(outputEvents), "Unexpected events after deduplication");
}
В тесте необходимо обработать события первый раз через processElements, а далее использовать вспомогательный метод для эмулирования TTL. Вызов setStateTtlProcessingTime указывает оператору думать, будто прошло уже (TTL.toMilliseconds() – 1) времени после обработки предыдущих событий. Так как сам TTL больше на 1 мс, то сообщения должны дедуплицироваться. Для этого мы отправляем их на вход ещё раз и проверяем, что на выходе testHarness.getOutput() их не стало в два раза больше.
Теперь нам нужен тест, который проверит, что спустя TTL сообщения могут обрабатываться вновь:
@SneakyThrows
@ParameterizedTest
@MethodSource("provideUniqueEvents")
void shouldNotDeduplicateBecauseOfTtlExpired(List<StreamRecord<String>> events) {
testHarness.processElements(events);
testHarness.setStateTtlProcessingTime(TTL.toMilliseconds() + 1);
testHarness.processElements(events);
final var outputEvents = testHarness.getOutput();
assertEquals(events.size() * 2, outputEvents.size(),
format("Unexpected number of events after deduplication. Output events: %s", outputEvents));
final var expectedEvents = new ArrayList<>();
expectedEvents.addAll(events);
expectedEvents.addAll(events);
assertEquals(expectedEvents, new ArrayList<>(outputEvents), "Unexpected events after expired ttl deduplication");
}
Тут всё то же самое, что и в предыдущем тесте, но мы устанавливаем setStateTtlProcessingTime в значение на 1 мс больше, чем TTL. Тогда информация, что сообщения events встречались ранее, должна удалиться из внутреннего состояния ValueState<Boolean>, поэтому дедупликация не сработает. Как раз это и проверяем в блоке с assert — сообщений на выходе в два раза больше, т. к. отправили их дважды.
Вывод
Мы рассмотрели, как можно протестировать stateless-операторы и более сложные stateful-операторы с внутренним состоянием с использованием вспомогательных TestHarness-классов. Так можно и нужно провести unit-тестирование каждого нашего оператора отдельно, независимо от всех джоб. Но нам ещё нужна гарантия, что операторы будут корректно работать в реальном Flink-окружении и что весь пайплайн настроен и работает правильно. Поэтому я покажу, как можно написать тест на полноценную джобу с использованием Flink MiniCluster, в следующей статье. А ещё — как использовать подход мини-кластеров для тестирования отдельных stateful-операторов.