Привет, Хабр! С вами вновь Александр Бобряков, техлид в команде МТС Аналитики. И я с очередной статьёй из цикла про фреймворк Apache Flink.
В предыдущей части я рассказал, как тестировать stateless- и stateful-операторы Flink с использованием вспомогательных TestHarness-абстракций, предоставляемых Flink.
В этой статье напишем тесты на всю джобу с использованием мини-кластера Flink и при помощи JUnit Extension. Ещё мы начнём выделять удобные вспомогательные абстракции для тестов, которые понадобятся позже.
Список моих постов про Flink
Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии статей. Эта статья соответствует релизной ветке с названием release/5_flinkcluster_job_deduplicator_test.
Оглавление статьи
Flink MiniCluster
В документации по написанию тестов предлагается использовать абстракцию Flink мини-кластера MiniClusterWithClientResource для локального тестирования полноценных заданий. Это обусловлено тем, что мы не сможем полностью воспроизвести работу Flink на обычных Unit-тестах (даже с помощью TestHarness), учитывая параллельность и другие внутренние процессы. Но мини-кластер такую возможность даёт.
После старта мини-кластера универсальный метод для определения окружения StreamExecutionEnvironment.getExecutionEnvironment() автоматически подключится к мини-кластеру, и все наши задания будут выполняться на нём. Документация предлагает такой сценарий использования:
public class ExampleIntegrationTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void someТest() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
}
}
Но в этом случае нужно создавать @ClassRule на каждый тестовый класс либо использовать наследование в тестах. Я предлагаю немного другой, более удобный способ.
Итак, какие у нас условия задачи?
Во-первых, хотелось бы не тянуть лишних зависимостей в тесты с Flink.
Во-вторых, у нас должна быть возможность включать мини-кластер в любом тесте максимально просто и, главное, единожды, чтобы кластер поднимался перед всеми тестовыми классами и убивался после всех тестов.
Какое решение? На ум приходит использование своей аннотации @WithFlinkCluster, которая будет предоставлять Flink мини-кластер для класса-теста, над которым она висит. Давайте посмотрим на саму реализацию:
@Retention(RUNTIME)
@Inherited
@ExtendWith({FlinkClusterExtension.class})
public @interface WithFlinkCluster {
}
Ничего особенного в ней нет. Основная фишка внутри FlinkClusterExtension. Это JUnit Extension. Если кратко, то они нужны для изменения поведения тестов с помощью событий их жизненного цикла. Реализация моего FlinkClusterExtension выглядит вот так:
@Slf4j
@SuppressWarnings({"PMD.AvoidUsingVolatile"})
public class FlinkClusterExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource {
private static final MiniClusterWithClientResource FLINK_CLUSTER;
private static final Lock LOCK = new ReentrantLock();
private static volatile boolean started;
static {
final var configuration = new Configuration();
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);
FLINK_CLUSTER = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
}
@Override
public void beforeAll(ExtensionContext context) throws Exception {
LOCK.lock();
try {
if (!started) {
log.info("Start Flink MiniCluster");
started = true;
FLINK_CLUSTER.before();
context.getRoot().getStore(GLOBAL).put("Flink Cluster", this);
}
} finally {
LOCK.unlock();
}
}
@Override
public void close() {
log.info("Close Flink MiniCluster");
FLINK_CLUSTER.after();
started = false;
}
}
Обратите внимание на имплементацию двух интерфейсов: BeforeAllCallback, ExtensionContext.Store.CloseableResource. Первый предоставляет метод beforeAll перед стартом всех тестов внутри каждого тестового класса, у которых висит аннотация @ExtendWith({FlinkClusterExtension.class}). А второй — коллбэк на закрытие ресурсов уже после отработки всех тестовых классов. В статическом блоке инициализируем Flink мини-кластер, передав ему различные настройки:
количество слотов
TaskManager
стандартный параллелизм — его лучше по умолчанию выставить >1, чтобы отловить неочевидные баги в ваших сценариях
В методе beforeAll выполняется непосредственный запуск кластера через его метод before(). В реализации FlinkClusterExtension присутствуют синхронизации в виде блокировки через объект ReentrantLock, чтобы избежать повторного запуска в случае параллельно запускаемых тестов. Метод close завершает работу мини-кластера единожды после выполнения всех тестов, которые используют текущий JUnit Extension.
В итоге каждый тест, где нужен будет кластер Flink, можно писать так:
@WithFlinkCluster
class DeduplicatorUnitTest_byFlinkCluster {
@Test
void test() {
final var env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
}
}
Абстракция для Sink в тестировании через Flink MiniCluster
Прежде чем переходить непосредственно к написанию теста, нужно подумать: а как мы будем проверять наши задания? Ожидаемый результат выполнения задания или оператора — наличие событий на его выходе.
Проблема в том, что каждый оператор имеет свою параллельность. Благодаря ей операторы сериализуются на каждый TaskManager в количестве своей параллельности (если в каждом TM существует единственный слот). Каждый из таких параллельных операторов может писать в свой параллельный экземпляр Sink-оператора. А нам было бы удобно собрать их воедино.
Об этом также упоминается в конце документации в блоке замечаний, где авторы предлагают создать свой CollectSink. Они используют статическую коллекцию, нам такой вариант не очень подходит — тесты могут выполняться параллельно и независимо, а создавать отдельный класс со статической коллекцией неудобно, ведь доступ к единственной статической коллекции будет из всех классов одновременно. Это может привести к тому, что тесты будут влиять друг на друга.
В качестве решения в исходниках Flink можно обнаружить синглтон-класс org.apache.flink.test.streaming.runtime.util.TestListWrapper, который предлагает более подходящий вариант:
private List<List<? extends Comparable>> lists;
Далее в каждом отдельном тесте при инициализации TestListWrapper создаётся внутренний List в объекте lists выше. ID этого листа возвращается пользователю, а дальше можно написать свой Writer, который будет писать именно в этот List, запрашивая его по полученному id у самого TestListWrapper. Звучит непонятно, поэтому предоставлю код идеи:
@SuppressWarnings("PMD.TestClassWithoutTestCases")
public class TestListSink<T> implements Sink<T> {
private static final long serialVersionUID = 1L;
private final ListWriter writer = new ListWriter();
private final int resultListId;
public TestListSink() {
this.resultListId = TestListWrapper.getInstance().createList();
}
@Override
public SinkWriter<T> createWriter(InitContext context) {
return writer;
}
public List<T> getHandledEvents() {
return new ArrayList<>(resultList());
}
@SuppressWarnings("unchecked")
private List<T> resultList() {
synchronized (TestListWrapper.getInstance()) {
return (List<T>) TestListWrapper.getInstance().getList(resultListId);
}
}
private class ListWriter implements SinkWriter<T>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public void write(T element, Context context) {
resultList().add(element);
}
@Override
public void flush(boolean endOfInput) {
// no op
}
@Override
public void close() {
// no op
}
}
}
При создании моего TestListSink (в каждом отдельном тесте) инициализируется новый List и запоминается его id: TestListWrapper.getInstance().createList(). Также у нас есть своя реализация SinkWriter, которая при получении события в методе write записывает его в один и тот же List по id листа. Так в случае большой параллельности выходных операторов мы получим единый List, в который будет писать каждый параллельный экземпляр выходного оператора. Также нам полезно определить вспомогательный метод getHandledEvents, который вернёт все записанные события всех параллельных экземпляров sink-оператора после выполнения теста.
Тестирование дедупликатора с помощью Flink MiniCluster
В прошлой статье мы написали тест на дедупликатор с помощью абстракций TestHarness. В качестве дополнительного примера можно переписать тот же тест с использованием мини-кластера. Для этого мы:
На новый тестовый класс повесим нашу новую аннотацию.
Немного перепишем сам тест, определив непосредственно минимальный пайплайн обработки с использованием дедупликатора.
Сделаем это в новом тестовом классе:
@WithFlinkCluster
public class DeduplicatorUnitTest_byFlinkCluster {
private final Time TTL = Time.milliseconds(100);
@SneakyThrows
@ParameterizedTest
@MethodSource("provideUniqueEvents")
void shouldDeduplicateMessagesByTtl(List<String> events) {
final var sourceEvents = new ArrayList<String>();
sourceEvents.addAll(events);
sourceEvents.addAll(events);
final var env = StreamExecutionEnvironment.getExecutionEnvironment();
final var sink = new TestListSink<String>();
env.fromCollection(sourceEvents)
.keyBy(value -> value)
.flatMap(new Deduplicator<>(TTL))
.sinkTo(sink);
env.execute();
final var outputEvents = sink.getHandledEvents();
assertEquals(events.size(), outputEvents.size(),
format("Unexpected number of events after deduplication. Output events: %s", outputEvents));
assertEquals(events, new ArrayList<>(outputEvents), "Unexpected events after deduplication");
}
private static Stream<Arguments> provideUniqueEvents() {
return Stream.of(arguments(List.of("key_1", "key_2")));
}
}
По своей сути тест очень похож на предыдущие: на вход получаем несколько String-событий, определяем источник данных fromCollection(), передав в него дважды входные данные. Потом определяем сам дедупликатор и выходной Sink с использованием нашего универсального TestListSink. После запуска пайплайна проверяем, что данных в результирующем Sink столько же, сколько было уникальных сообщений.
Важным моментом является использование MiniCluster. Это происходит под капотом во время вызова метода StreamExecutionEnvironment.getExecutionEnvironment(). Так как наш FlinkClusterExtension отрабатывает до запуска теста, то на момент непосредственного выполнения теста уже будет создан мини-кластер на локальной машине, а метод getExecutionEnvironment() его увидит и подтянет.
Тестирование всей Job с помощью Flink MiniCluster
Теперь можно переходить к первому тесту всего Flink-задания. Напомню, что наша джоба фильтрует входной поток ClickMessage, пропуская события только с типом платформы WEB и APP. Далее события APP проходят дедупликацию, а затем события APP и WEB записываются в выходной топик Kafka, определяющийся динамически из поля ClickMessage.productTopic.
Пайплайн выглядит так:
В тесте будут участвовать многие Spring-компоненты, поэтому выделим новую аннотацию для тестирования Flink Job:
@Retention(RUNTIME)
@SpringBootTest(
webEnvironment = NONE,
classes = {
PropertiesConfig.class,
FlinkConfig.class,
})
@ActiveProfiles({"test"})
@WithFlinkCluster
public @interface FlinkJobTest {
}
Так как мы не хотим поднимать абсолютно весь контекст по умолчанию, то задаём в аннотации @SpringBootTest лишь две конфигурации:
PropertyConfig, который добавляет в контекстное все наши проперти-классы, которые биндятся с application.yml.
FlinkConfig, в котором регистрируется и настраивается бин StreamExecutionEnvironment.
Дополнительно мы применим созданную аннотацию @FlinkJobTest для поднятия мини-кластера.
Мы тестируем саму логику джобы, а какие именно будут source и sink, неважно. Реальные имплементации (Kafka) мы подставим в Е2Е-тестах.
Первый тест будет проверять, что события APP дедуплицируются:
@FlinkJobTest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
class ClickToProductJobUnitTest {
@Autowired
private StreamExecutionEnvironment environment;
@Autowired
private ClickToProductJobProperties properties;
@ParameterizedTest
@EnumSource(value = Platform.Enum.class, names = {"APP"})
@SneakyThrows
void shouldDeduplicateClickMessages(Platform platform) {
final var message = aClickMessage().withPlatform(platform).build();
final var sink = new TestListSink<WrappedSinkMessage<ProductMessage>>();
final var job = new ClickToProductJob(
properties,
env -> env.fromElements(message, message, message).uid("test_source"),
() -> sink
);
job.registerJob(environment);
environment.execute();
final var out = sink.getHandledEvents();
assertEquals(1, out.size(), format("Unexpected message count in sink: %s", out));
}
// ...
}
В начале кода мы используем нашу аннотацию @FlinkJobTest. В самом тесте можем заинжектить основные бины:
StreamExecutionEnvironment, нацеленный на поднятый в FlinkClusterExtension мини-кластер.
ClickToProductJobProperties, в котором имеем все настроенные в application-test.yml настройки.
В самом тесте создаём ClickMessage — на основе платформы APP, переданной в аргументы параметризованного теста. Далее определяем описанный в предыдущих главах Sink TestListSink и вручную создаём тестируемую джобу ClickToProductJob. В аргументы джобы передаём проперти, источник данных и приёмник.
! Источник данных содержит три одинаковых экземпляра входного сообщения, которые джоба должна дедуплицировать.
После этого происходит регистрация джобы в environment мини-кластера и синхронный запуск. Так как наш источник данных fromCollection имеет три события, то по мере их обработки джоба завершится автоматически. Это достаточно важный момент, потому что в случае бесконечных источников данных (например, Kafka) джоба будет выполняться вечно, пока мы её не завершим. Для вечного выполнения понадобится асинхронный запуск, который рассмотрим в следующих статьях. На выходе проверяем, что Sink содержит лишь одно событие, а остальные дедуплицировались.
Так можно написать тест, который проверяет, что события WEB не дедуплицируются, но обрабатываются, а события с произвольным типом платформы не обрабатываются вообще. Пример теста можно посмотреть в репозитории проекта, указанном в начале статьи.
В итоге структура тестов выглядит следующим образом:
Вывод
Мы рассмотрели создание Unit-теста на полноценную джобу Flink и отдельные stateful-операторы с использованием мини-кластера. Также мы научились запускать мини-кластер один раз перед всеми тестовыми классами, нуждающимися в нём. В дополнение мы создали вспомогательные абстракции и аннотации, существенно разделяя ответственность в тестах и упрощая логику написания новых тестов.
За кадром осталась Kafka, ведь наша джоба читает и пишет данные в её топики. Как написать E2E-тест на полную интеграцию Kafka + Flink Job, я расскажу в следующей части.