Привет, Хабр! С вами вновь Александр Бобряков, техлид в команде МТС Аналитики. И я с очередной статьёй из цикла про фреймворк Apache Flink.

В предыдущей части я рассказал, как тестировать stateless- и stateful-операторы Flink с использованием вспомогательных TestHarness-абстракций, предоставляемых Flink.

В этой статье напишем тесты на всю джобу с использованием мини-кластера Flink и при помощи JUnit Extension. Ещё мы начнём выделять удобные вспомогательные абстракции для тестов, которые понадобятся позже.

Список моих постов про Flink

Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии статей. Эта статья соответствует релизной ветке с названием release/5_flinkcluster_job_deduplicator_test.

Оглавление статьи

Flink MiniCluster

В документации по написанию тестов предлагается использовать абстракцию Flink мини-кластера MiniClusterWithClientResource для локального тестирования полноценных заданий. Это обусловлено тем, что мы не сможем полностью воспроизвести работу Flink на обычных Unit-тестах (даже с помощью TestHarness), учитывая параллельность и другие внутренние процессы. Но мини-кластер такую возможность даёт.

После старта мини-кластера универсальный метод для определения окружения StreamExecutionEnvironment.getExecutionEnvironment() автоматически подключится к мини-кластеру, и все наши задания будут выполняться на нём. Документация предлагает такой сценарий использования:

public class ExampleIntegrationTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(2)
                            .setNumberTaskManagers(1)
                            .build());

    @Test
    public void someТest() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // ...
    }
}

Но в этом случае нужно создавать @ClassRule на каждый тестовый класс либо использовать наследование в тестах. Я предлагаю немного другой, более удобный способ.

Итак, какие у нас условия задачи?

Во-первых, хотелось бы не тянуть лишних зависимостей в тесты с Flink.

Во-вторых, у нас должна быть возможность включать мини-кластер в любом тесте максимально просто и, главное, единожды, чтобы кластер поднимался перед всеми тестовыми классами и убивался после всех тестов.

Какое решение? На ум приходит использование своей аннотации @WithFlinkCluster, которая будет предоставлять Flink мини-кластер для класса-теста, над которым она висит. Давайте посмотрим на саму реализацию:

@Retention(RUNTIME)
@Inherited
@ExtendWith({FlinkClusterExtension.class})
public @interface WithFlinkCluster {
}

Ничего особенного в ней нет. Основная фишка внутри FlinkClusterExtension. Это JUnit Extension. Если кратко, то они нужны для изменения поведения тестов с помощью событий их жизненного цикла. Реализация моего FlinkClusterExtension выглядит вот так:

@Slf4j
@SuppressWarnings({"PMD.AvoidUsingVolatile"})
public class FlinkClusterExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource {
   private static final MiniClusterWithClientResource FLINK_CLUSTER;
   private static final Lock LOCK = new ReentrantLock();
   private static volatile boolean started;

   static {
       final var configuration = new Configuration();
       configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);
       FLINK_CLUSTER = new MiniClusterWithClientResource(
           new MiniClusterResourceConfiguration.Builder()
               .setConfiguration(configuration)
               .setNumberSlotsPerTaskManager(2)
               .setNumberTaskManagers(1)
               .build());
   }

   @Override
   public void beforeAll(ExtensionContext context) throws Exception {
       LOCK.lock();
       try {
           if (!started) {
               log.info("Start Flink MiniCluster");
               started = true;
               FLINK_CLUSTER.before();
               context.getRoot().getStore(GLOBAL).put("Flink Cluster", this);
           }
       } finally {
           LOCK.unlock();
       }
   }

   @Override
   public void close() {
       log.info("Close Flink MiniCluster");
       FLINK_CLUSTER.after();
       started = false;
   }
}

Обратите внимание на имплементацию двух интерфейсов: BeforeAllCallback, ExtensionContext.Store.CloseableResource. Первый предоставляет метод beforeAll перед стартом всех тестов внутри каждого тестового класса, у которых висит аннотация @ExtendWith({FlinkClusterExtension.class}). А второй — коллбэк на закрытие ресурсов уже после отработки всех тестовых классов. В статическом блоке инициализируем Flink мини-кластер, передав ему различные настройки:

  • количество слотов

  • TaskManager

  • стандартный параллелизм — его лучше по умолчанию выставить >1, чтобы отловить неочевидные баги в ваших сценариях

В методе beforeAll выполняется непосредственный запуск кластера через его метод before(). В реализации FlinkClusterExtension присутствуют синхронизации в виде блокировки через объект ReentrantLock, чтобы избежать повторного запуска в случае параллельно запускаемых тестов. Метод close завершает работу мини-кластера единожды после выполнения всех тестов, которые используют текущий JUnit Extension.

В итоге каждый тест, где нужен будет кластер Flink, можно писать так:

@WithFlinkCluster
class DeduplicatorUnitTest_byFlinkCluster {
   @Test
    void test()  {
        final var env = StreamExecutionEnvironment.getExecutionEnvironment();
        // ...
    }
}

Абстракция для Sink в тестировании через Flink MiniCluster

Прежде чем переходить непосредственно к написанию теста, нужно подумать: а как мы будем проверять наши задания? Ожидаемый результат выполнения задания или оператора — наличие событий на его выходе.

Проблема в том, что каждый оператор имеет свою параллельность. Благодаря ей операторы сериализуются на каждый TaskManager в количестве своей параллельности (если в каждом TM существует единственный слот). Каждый из таких параллельных операторов может писать в свой параллельный экземпляр Sink-оператора. А нам было бы удобно собрать их воедино.

Об этом также упоминается в конце документации в блоке замечаний, где авторы предлагают создать свой CollectSink. Они используют статическую коллекцию, нам такой вариант не очень подходит — тесты могут выполняться параллельно и независимо, а создавать отдельный класс со статической коллекцией неудобно, ведь доступ к единственной статической коллекции будет из всех классов одновременно. Это может привести к тому, что тесты будут влиять друг на друга.

В качестве решения в исходниках Flink можно обнаружить синглтон-класс org.apache.flink.test.streaming.runtime.util.TestListWrapper, который предлагает более подходящий вариант:

private List<List<? extends Comparable>> lists;

Далее в каждом отдельном тесте при инициализации TestListWrapper создаётся внутренний List в объекте lists выше. ID этого листа возвращается пользователю, а дальше можно написать свой Writer, который будет писать именно в этот List, запрашивая его по полученному id у самого TestListWrapper. Звучит непонятно, поэтому предоставлю код идеи:

@SuppressWarnings("PMD.TestClassWithoutTestCases")
public class TestListSink<T> implements Sink<T> {
   private static final long serialVersionUID = 1L;

   private final ListWriter writer = new ListWriter();
   private final int resultListId;

   public TestListSink() {
       this.resultListId = TestListWrapper.getInstance().createList();
   }

   @Override
   public SinkWriter<T> createWriter(InitContext context) {
       return writer;
   }

   public List<T> getHandledEvents() {
       return new ArrayList<>(resultList());
   }

   @SuppressWarnings("unchecked")
   private List<T> resultList() {
       synchronized (TestListWrapper.getInstance()) {
           return (List<T>) TestListWrapper.getInstance().getList(resultListId);
       }
   }

   private class ListWriter implements SinkWriter<T>, Serializable {
       private static final long serialVersionUID = 1L;

       @Override
       public void write(T element, Context context) {
           resultList().add(element);
       }

       @Override
       public void flush(boolean endOfInput) {
           // no op
       }

       @Override
       public void close() {
           // no op
       }
   }
}

При создании моего TestListSink (в каждом отдельном тесте) инициализируется новый List и запоминается его id: TestListWrapper.getInstance().createList(). Также у нас есть своя реализация SinkWriter, которая при получении события в методе write записывает его в один и тот же List по id листа. Так в случае большой параллельности выходных операторов мы получим единый List, в который будет писать каждый параллельный экземпляр выходного оператора. Также нам полезно определить вспомогательный метод getHandledEvents, который вернёт все записанные события всех параллельных экземпляров sink-оператора после выполнения теста.

Тестирование дедупликатора с помощью Flink MiniCluster

В прошлой статье мы написали тест на дедупликатор с помощью абстракций TestHarness. В качестве дополнительного примера можно переписать тот же тест с использованием мини-кластера. Для этого мы:

  1. На новый тестовый класс повесим нашу новую аннотацию.

  2. Немного перепишем сам тест, определив непосредственно минимальный пайплайн обработки с использованием дедупликатора.

Сделаем это в новом тестовом классе:

@WithFlinkCluster
public class DeduplicatorUnitTest_byFlinkCluster {
   private final Time TTL = Time.milliseconds(100);

   @SneakyThrows
   @ParameterizedTest
   @MethodSource("provideUniqueEvents")
   void shouldDeduplicateMessagesByTtl(List<String> events) {
       final var sourceEvents = new ArrayList<String>();
       sourceEvents.addAll(events);
       sourceEvents.addAll(events);
       final var env = StreamExecutionEnvironment.getExecutionEnvironment();
       final var sink = new TestListSink<String>();
       env.fromCollection(sourceEvents)
           .keyBy(value -> value)
           .flatMap(new Deduplicator<>(TTL))
           .sinkTo(sink);

       env.execute();

       final var outputEvents = sink.getHandledEvents();
       assertEquals(events.size(), outputEvents.size(),
           format("Unexpected number of events after deduplication. Output events: %s", outputEvents));
       assertEquals(events, new ArrayList<>(outputEvents), "Unexpected events after deduplication");
   }

   private static Stream<Arguments> provideUniqueEvents() {
       return Stream.of(arguments(List.of("key_1", "key_2")));
   }
}

По своей сути тест очень похож на предыдущие: на вход получаем несколько String-событий, определяем источник данных fromCollection(), передав в него дважды входные данные. Потом определяем сам дедупликатор и выходной Sink с использованием нашего универсального TestListSink. После запуска пайплайна проверяем, что данных в результирующем Sink столько же, сколько было уникальных сообщений.

Важным моментом является использование MiniCluster. Это происходит под капотом во время вызова метода StreamExecutionEnvironment.getExecutionEnvironment(). Так как наш FlinkClusterExtension отрабатывает до запуска теста, то на момент непосредственного выполнения теста уже будет создан мини-кластер на локальной машине, а метод getExecutionEnvironment() его увидит и подтянет.

Тестирование всей Job с помощью Flink MiniCluster

Теперь можно переходить к первому тесту всего Flink-задания. Напомню, что наша джоба фильтрует входной поток ClickMessage, пропуская события только с типом платформы WEB и APP. Далее события APP проходят дедупликацию, а затем события APP и WEB записываются в выходной топик Kafka, определяющийся динамически из поля ClickMessage.productTopic.

Пайплайн выглядит так:

В тесте будут участвовать многие Spring-компоненты, поэтому выделим новую аннотацию для тестирования Flink Job:

@Retention(RUNTIME)
@SpringBootTest(
   webEnvironment = NONE,
   classes = {
       PropertiesConfig.class,
       FlinkConfig.class,
   })
@ActiveProfiles({"test"})
@WithFlinkCluster
public @interface FlinkJobTest {
}

Так как мы не хотим поднимать абсолютно весь контекст по умолчанию, то задаём в аннотации @SpringBootTest лишь две конфигурации:

  1. PropertyConfig, который добавляет в контекстное все наши проперти-классы, которые биндятся с application.yml.

  2. FlinkConfig, в котором регистрируется и настраивается бин StreamExecutionEnvironment.

Дополнительно мы применим созданную аннотацию @FlinkJobTest для поднятия мини-кластера.

Мы тестируем саму логику джобы, а какие именно будут source и sink, неважно. Реальные имплементации (Kafka) мы подставим в Е2Е-тестах.

Первый тест будет проверять, что события APP дедуплицируются:

@FlinkJobTest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
class ClickToProductJobUnitTest {
   @Autowired
   private StreamExecutionEnvironment environment;
   @Autowired
   private ClickToProductJobProperties properties;

   @ParameterizedTest
   @EnumSource(value = Platform.Enum.class, names = {"APP"})
   @SneakyThrows
   void shouldDeduplicateClickMessages(Platform platform) {
       final var message = aClickMessage().withPlatform(platform).build();
       final var sink = new TestListSink<WrappedSinkMessage<ProductMessage>>();
       final var job = new ClickToProductJob(
           properties,
           env -> env.fromElements(message, message, message).uid("test_source"),
           () -> sink
       );

       job.registerJob(environment);
       environment.execute();

       final var out = sink.getHandledEvents();
       assertEquals(1, out.size(), format("Unexpected message count in sink: %s", out));
   }

   // ...
}

В начале кода мы используем нашу аннотацию @FlinkJobTest. В самом тесте можем заинжектить основные бины:

  1. StreamExecutionEnvironment, нацеленный на поднятый в FlinkClusterExtension мини-кластер.

  2. ClickToProductJobProperties, в котором имеем все настроенные в application-test.yml настройки.

В самом тесте создаём ClickMessage — на основе платформы APP, переданной в аргументы параметризованного теста. Далее определяем описанный в предыдущих главах Sink TestListSink и вручную создаём тестируемую джобу ClickToProductJob. В аргументы джобы передаём проперти, источник данных и приёмник.

! Источник данных содержит три одинаковых экземпляра входного сообщения, которые джоба должна дедуплицировать.

После этого происходит регистрация джобы в environment мини-кластера и синхронный запуск. Так как наш источник данных fromCollection имеет три события, то по мере их обработки джоба завершится автоматически. Это достаточно важный момент, потому что в случае бесконечных источников данных (например, Kafka) джоба будет выполняться вечно, пока мы её не завершим. Для вечного выполнения понадобится асинхронный запуск, который рассмотрим в следующих статьях. На выходе проверяем, что Sink содержит лишь одно событие, а остальные дедуплицировались.

Так можно написать тест, который проверяет, что события WEB не дедуплицируются, но обрабатываются, а события с произвольным типом платформы не обрабатываются вообще. Пример теста можно посмотреть в репозитории проекта, указанном в начале статьи.

В итоге структура тестов выглядит следующим образом:

Вывод

Мы рассмотрели создание Unit-теста на полноценную джобу Flink и отдельные stateful-операторы с использованием мини-кластера. Также мы научились запускать мини-кластер один раз перед всеми тестовыми классами, нуждающимися в нём. В дополнение мы создали вспомогательные абстракции и аннотации, существенно разделяя ответственность в тестах и упрощая логику написания новых тестов.

За кадром осталась Kafka, ведь наша джоба читает и пишет данные в её топики. Как написать E2E-тест на полную интеграцию Kafka + Flink Job, я расскажу в следующей части.

Комментарии (0)