![](https://habrastorage.org/getpro/habr/upload_files/a9c/515/83e/a9c51583e84ffb6ac445bc6eaced9438.png)
Привет, Хабр! На связи Александр Бобряков, техлид команды МТС Аналитики. Это мой финальный пост про Apache Flink. В предыдущих частях мы рассматривали процессы сериализации данных, написали кастомный сериализатор, а также покрыли его тестами. В них проверялась поддержка эволюции схемы состояния. Для этого мы реализовали много удобных абстракций, связанных с перезагрузкой Flink MiniCluster, компиляцией тестовых java-классов, использованием их в classpath кластера и так далее.
В этом материале мы применим разработанный сериализатор JacksonStateSerializer в бою, а также автоматизируем проверку эволюции схемы для всех наших классов, участвующих в сериализации.
Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии. Данная статья соответствует релизной ветке с названием release/11_JacksonEvolvingState. По мере выхода новых материалов на Хабре ссылки на них будут появляться ниже.
Список моих постов про Flink:
Введение в Apache Flink: осваиваем фреймворк на реальных примерах
Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?
Как провести unit-тестирование Flink-операторов: Test Harness
Unit и E2E-тестирование оператора с таймерами в Apache Flink
Apache Flink: тестирование собственного сериализатора состояния
Apache Flink: использование и автоматическая проверка собственного сериализатора состояния
Оглавление:
-
Автоматизация проверки корректности изменения схемы состояния
Автоматизация проверки инкремента версии схемы при ее изменении
Внедряем JacksonStateSerializer
В прошлых материалах мы реализовали сериализатор JacksonStateSerializer, поддерживающий эволюцию схемы состояний на основе заданных в ней jackson-аннотаций. Также мы покрыли его тестами. Ранее в постах я описал оператор, в котором использовалось состояние ValueState<AlertState> под бизнес-условия. Инициализировали мы его так:
final var alertDescriptor = new ValueStateDescriptor<>("alertState", AlertState.class);
alertDescriptor.enableTimeToLive(defaultTtlConfig);
ValueState<AlertState> alertState = getRuntimeContext().getState(alertDescriptor);
В это случае мы в дескрипторе указываем тип события. В результате Flink будет использовать Kryo сериализатор по умолчанию, так как объекты класса AlertState не является POJO. Чтобы применить JacksonStateSerializer, необходимо изменить инициализацию дескриптора:
final var alertDescriptor = new ValueStateDescriptor<>("alertState", new JacksonStateSerializer<>(AlertState.class));
alertDescriptor.enableTimeToLive(defaultTtlConfig);
ValueState<AlertState> alertState = getRuntimeContext().getState(alertDescriptor);
Запускаем тесты из прошлых частей и проверяем, что все работает правильно, а бизнес-логика не поломалась:
![](https://habrastorage.org/getpro/habr/upload_files/069/d47/e5d/069d47e5d3eb217ff429833e76e333b8.png)
Автоматизация проверки корректности изменения схемы состояния
Так как в приложении используется AlertState в качестве схемы состояния, нужно быть уверенными, что разработчик при изменении схемы сделает ее обратно совместимой с предыдущей. Для этого он должен определить нужные Jackson аннотации, чтобы наш сериализатор работал корректно.
Аннотация @JacksonEvolvingState
Вводим собственную аннотацию JacksonEvolvingState, которой будем помечать схемы состояний:
@Retention(RUNTIME)
@Target(ElementType.TYPE)
@Inherited
public @interface JacksonEvolvingState {
@Min(1)
int version();
}
В ней будет единственное поле version, определяющее версию текущей схемы. Схема состояния примет вид:
@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
@JacksonEvolvingState(version = 1)
public class AlertState {
@JsonPropertyDescription("User id")
private UUID userId;
@JsonPropertyDescription("Trigger name")
private String triggerName;
@JsonPropertyDescription("Timestamp")
private Long timestamp;
}
Такая аннотация над классом схемы предполагает при эволюции следующий алгоритм действий:
Изменяем схему по бизнес-требованиям: удаляем/добавляем поля и т.д.
Инкрементим версию X в аннотации @JacksonEvolvingState(version = X).
Копируем все содержимое актуального класса схемы (без package) и вставляем в директорию src/test/resources/state/<Имя_класса>/<версия>.txt.
Запускаем новый тест на проверку корректности изменения схемы относительно предыдущей и текущей версий (то есть сможет ли объект десериализоваться сам в себя) JacksonEvolvingStatesUnitTest#shouldSuccessRestoredBetweenEvolvedStates.
Стоит протестировать все схемы, помеченные этой аннотацией. Для этого пишем тест JacksonEvolvingStatesUnitTest#shouldSuccessRestoredBetweenEvolvedStates из алгоритма выше. Нам нужно убедиться, что все схемы с аннотацией JacksonEvolvingState корректно эволюционируют из одной версии в другую вплоть до последней указанной версии.
Тест проверки эволюции аннотированной схемы
За основу возьмем JacksonStateSerializerUnitTest из прошлой статьи. Напомню его краткий алгоритм:
Взять AlertState-схему v1 для использования в качестве состояния в stateful-операторе.
Запустить Flink MiniCluster, подложив в classpath схему v1.
Прогнать одно сообщение в потоке данных для сериализации объекта в состояния со схемой версии v1.
Снять точку сохранения savepoint.
Остановить кластер.
Взять AlertState-схему v2 для использования в качестве состояния в предыдущем stateful-операторе.
Проверить, что десериализация схемы v1 в v2 прошла успешно.
Таким образом, на вход теста нам понадобится: название класса и перечень версий для проверки. Воспользуемся подходом из существующего теста и организуем файлы в директории resource следующим образом:
![](https://habrastorage.org/getpro/habr/upload_files/345/edd/003/345edd003d5e41c9fad02fc3e97664f7.jpg)
В /state будем хранить все схемы для тестирования. Текущую сохраняем в текстовый файл с именем, содержащим название версии каждого изменения схемы. Как про это не забыть, расскажу ниже.
Затем напишем метод, возвращающий все комбинации названия класса с версиями для нашего теста:
private static Stream<Arguments> provideClassWithEvolvedVersionsPairs() {
Stream<Arguments> result = Stream.empty();
final var reflections = new Reflections("com.asbobryakov.flink_spring");
final var evolvingStates = reflections.getTypesAnnotatedWith(JacksonEvolvingState.class);
for (Class<?> evolvingStateClass : evolvingStates) {
final var className = evolvingStateClass.getSimpleName();
final var annotation = evolvingStateClass.getAnnotation(JacksonEvolvingState.class);
final var annotationVersion = annotation.version();
for (int i = 1; i < annotationVersion; i++) {
log.info("Evolving State '{}' v{} -> v{} will be tested", className, i, i + 1);
result = Stream.concat(result, Stream.of(arguments(className, i, i + 1)));
}
log.info("Evolving State '{}' v{} -> v{} will be tested", className, annotationVersion, annotationVersion);
result = Stream.concat(result, Stream.of(arguments(className, annotationVersion, annotationVersion)));
}
return result;
}
Этот метод сканирует весь пакет проекта и находит классы, помеченные аннотацией JacksonEvolvingState. Это делается с помощью объекта Reflections. Для его использования вам потребуется зависимость:
testImplementation "org.reflections:reflections:0.10.2"
Далее тест получает версию из аннотации и выдает все комбинации версий для эволюции. Например, для версии 3 нашего AlertState метод выдаст следующие комбинации аргументов для будущего теста:
AlertState 1 2
AlertState 2 3
AlertState 3 3
Перейдем непосредственно к тесту. Он будет один в один, как JacksonStateSerializerUnitTest, описанный в прошлой статье.
@Slf4j
@SuppressWarnings({"PMD.UseVarargs", "PMD.UseProperClassLoader", "PMD.AddEmptyString"})
public class JacksonEvolvingStateUnitTest {
@TempDir
private Path tmpFolder;
private File savepointDir;
private File rocksDbDir;
@BeforeEach
@SneakyThrows
void init() {
savepointDir = new File(tmpFolder.toFile(), "savepoints");
rocksDbDir = new File(tmpFolder.toFile(), "rocksDb");
if (!savepointDir.mkdirs() || !rocksDbDir.mkdirs()) {
fail(format("Test setup failed: failed to create temporary directories: %s",
List.of(savepointDir.toString(), rocksDbDir.toString())));
}
}
@ParameterizedTest
@MethodSource("provideClassWithEvolvedVersionsPairs")
@SneakyThrows
void shouldSuccessRestoredBetweenEvolvedStates(String className, int currentVersion, int evolvedVersion) {
// job to store base state in savepoint
final var objectTypeClassLoaderDto = compileStateClass(className, currentVersion);
var cluster = createMiniCluster();
startMiniClusterWithClasspath(cluster, Arrays.asList(objectTypeClassLoaderDto.getClassLoader().getURLs()));
final var jacksonStateSerializer = new JacksonStateSerializer<>(objectTypeClassLoaderDto.getType());
final var statefulCounter = new TestStatefulMapCounter<>(jacksonStateSerializer);
final var fullObjectSink = new TestListSink<>();
@Cleanup final var env = createEnvironmentWithRockDb(rocksDbDir);
final var object = objectTypeClassLoaderDto.getObject();
env.addSource(TestUnboundedSource.fromElements(List.of(object)))
.returns(objectTypeClassLoaderDto.getType())
.keyBy(value -> "static_key")
.flatMap(statefulCounter).uid("id_stateful")
.sinkTo(fullObjectSink);
final var jobClient = env.executeAsync(env.getStreamGraph());
await()
.atMost(ofSeconds(5))
.until(() -> fullObjectSink.getHandledEvents().size() == 1);
assertEquals(0, statefulCounter.getStatefulCounter(), "Stateful counter has state for current object key");
final var savepoint = jobClient.triggerSavepoint(savepointDir.toString(), NATIVE).get(2, SECONDS);
jobClient.cancel().get(2, SECONDS);
cluster.after();
// job with restored state from savepoint for evolved schema
final var evolvedObjectTypeClassLoaderDto = compileStateClass(className, evolvedVersion);
cluster = createMiniCluster();
startMiniClusterWithClasspath(cluster, Arrays.asList(evolvedObjectTypeClassLoaderDto.getClassLoader().getURLs()));
final var evolvedJacksonStateSerializer = new JacksonStateSerializer<>(evolvedObjectTypeClassLoaderDto.getType());
final var evolvedStatefulCounter = new TestStatefulMapCounter<>(evolvedJacksonStateSerializer);
final var evolvedSink = new TestListSink<>();
@Cleanup final var evolvedEnv = createEnvironmentWithRockDb(rocksDbDir);
final var evolvedObject = evolvedObjectTypeClassLoaderDto.getObject();
evolvedEnv.addSource(TestUnboundedSource.fromElements(List.of(evolvedObject)))
.returns(evolvedObjectTypeClassLoaderDto.getType())
.keyBy(value -> "static_key")
.flatMap(evolvedStatefulCounter).uid("id_stateful")
.sinkTo(evolvedSink);
final var streamGraph = evolvedEnv.getStreamGraph();
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepoint));
final var evolvedJobClient = evolvedEnv.executeAsync(streamGraph);
await()
.atMost(ofSeconds(5))
.until(() -> evolvedSink.getHandledEvents().size() == 1);
assertEquals(1, evolvedStatefulCounter.getStatefulCounter(),
"Stateful counter has no state for the current object key, which must be restored from a savepoint");
evolvedJobClient.cancel().get(2, SECONDS);
cluster.after();
}
private static Stream<Arguments> provideClassWithEvolvedVersionsPairs() {
// …
}
}
Запускаем тест, он падает и выдает такую ошибку:
![](https://habrastorage.org/getpro/habr/upload_files/517/dcd/a1e/517dcda1e343048ff79d6309c7fa8722.png)
Package lombok does not exist
Ошибка "Package lombok does not exist" заключается в том, что java при компиляции файла не смогла распознать импорты lombok, прописанные в нашем v1.txt файле схемы, из которой мы создали компилируемый java-файл.
Давайте разберемся в причинах ошибки. Для этого достаточно встать дебагом на месте, где мы компилируем класс схемы с помощью JavaCompiler, и посмотрим переданный в компилятор classpath:
![](https://habrastorage.org/getpro/habr/upload_files/501/90c/3d2/50190c3d2eda08efe8a83b462b1867ce.png)
Здесь нет ничего, связанного с lombok. Это происходит потому, что мы определяем gradlew.build согласно документации на уровне compileOnly:
dependencies {
annotationProcessor 'org.projectlombok:lombok'
compileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
testCompileOnly 'org.projectlombok:lombok'
}
Решение проблемы достаточно простое: изменить конфигурацию gradle lombok с testCompileOnly на testImplementation. Можно было бы еще попробовать сделать delombok преобразование в рантайме через Apache Ant таску с использованием lombok.delombok.ant.Tasks$Delombok. Но в этом случае нам также понадобиться изменить конфигурацию lombok до уровня testImplementation. В нашем случае в таком приеме нет необходимости, но он может пригодиться для других задач. Зависимость lombok, которая требовалась только во время тестовой компиляции, станет видна в рантайме теста и автоматически добавится в classpath. Проверяем переданный classpath еще раз и видим найденные совпадения:
![](https://habrastorage.org/getpro/habr/upload_files/432/924/77d/43292477db2519a1977c06143bf43c14.png)
При этом тест успешно проходит:
![](https://habrastorage.org/getpro/habr/upload_files/19a/35c/787/19a35c7876dc32c9e0ddab98239e5fa2.png)
Автоматизация проверки инкремента версии схемы при ее изменении
Давайте теперь изменим схему AlertState, добавив в него новое поле. Нужно не забыть увеличить версию внутри аннотации. На данный момент тест по-прежнему просканирует все файлы, увидит, что AlertState версии 1 из аннотации @JacksonEvolvingState(version=1), и пройдет успешно, независимо от корректности изменения схемы. Нам нужно как-то защититься от такой ошибки.
Как проверить, что при изменении схемы также инкрементили версию внутри аннотации? Единственным вариантом является сравнение содержимого текущей схемы с ее версией в папке с ресурсами, откуда тест получает всю историю схем. Мы обязаны ее туда добавлять сами для корректности теста. Также это позволяет нам следить за всей историей изменении схемы.
Сравним содержимое текущей схемы в src с последней версией в test/resources/. Для этого просто добавим в метод генерации входных данных для теста дополнительный assert-метод:
@SneakyThrows
private static void assertLastVersionsInSrcAndResourcesAreEquals(Class<?> clazz, int lastVersion) {
final var srcClassContent = IOUtils.toString(
Paths.get("src", "main", "java", clazz.getName().replaceAll("\\.", "/") + ".java").toUri(),
StandardCharsets.UTF_8);
final var srcClassContentWithoutPackage = srcClassContent.substring(srcClassContent.indexOf("\n")).replaceAll("\n", "");
final var classSimpleName = clazz.getSimpleName();
final var resourceClassContent = StateClassLoadingUtil.loadStateClassContentFromResources(classSimpleName, lastVersion).replaceAll("\n", "");
assertEquals(srcClassContentWithoutPackage, resourceClassContent,
format("The content of class '%s' version '%s' in /src differs from the last one set in resources/state/%s. " +
"The schema version in /src needs to be promoted. See the scheme evolution block in Readme for details.",
classSimpleName, lastVersion, classSimpleName));
Этот метод на входе получит класс, а также номер его версии согласно аннотации. Далее в методе мы можем прочитать текстовое содержимое класса в src, удалив package, т.к. в resources/ .txt файлах я складываю без него. Запишем в переменную srcClassContentWithoutPackage содержимое класса, а также последней версии сохраненного файла txt в ресурсах — resourceClassContent. Если они различаются или файл в test/resources с такой версией не нашли, то делаем вывод:
Либо мы изменили схему в src/ без ее добавления в test/resource для теста эволюции схемы.
Либо мы увеличили версию схемы в src без ее добавления в test/resources.
Делаем отсылку в assert на readme, в котором нужно зафиксировать алгоритм изменения схем. Добавляем assert в наш тест:
private static Stream<Arguments> provideClassWithEvolvedVersionsPairs() {
...
for (Class<?> evolvingStateClass : evolvingStates) {
...
final var annotationVersion = annotation.version();
assertLastVersionsInSrcAndResourcesAreEquals(evolvingStateClass, annotationVersion);
...
}
return result;
}
Меняем схему AlertState в src, добавив поле newField и оставив ту же версию аннотации:
@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
@JacksonEvolvingState(version = 1)
public class AlertState {
@JsonPropertyDescription("User id")
private UUID userId;
@JsonPropertyDescription("Trigger name")
private String triggerName;
@JsonPropertyDescription("Timestamp")
private Long timestamp;
@JsonPropertyDescription("newField")
private String newField;
}
Запускаем тест и как раз ловим ожидаемую ошибку:
![](https://habrastorage.org/getpro/habr/upload_files/b6c/700/ee9/b6c700ee9db3eb53d4ef82a37bd63889.png)
Теперь для исправления проблемы инкрементим версию в аннотации и добавляем в ресурсы v2.txt:
![](https://habrastorage.org/getpro/habr/upload_files/3f3/e5a/7d5/3f3e5a7d532096877c247f274f0535d5.jpg)
Запускаем тест, видим, что все корректно заработало:
![](https://habrastorage.org/getpro/habr/upload_files/0f0/d3e/7bf/0f0d3e7bfa67ba530d07411d85e2c3a9.png)
При этом мы проверили десериализацию схемы из v1 в v2, а также из v2 в саму v2. Получается, что наше преобразование в рамках добавления нового поля было корректно согласно эволюции схемы. Тест можно настроить на любые комбинации версий.
В данном материале я показал применение собственного сериализатора состояний операторов с поддержкой эволюции схемы, а также автоматизацию тестов на изменение схемы. Это позволяет убедиться, что ее бездумное изменение не вызовет ошибок на проде. Если разработчик менял схему без соблюдений наших правил, то тесты сразу упадут.
P.S. На этом текущий цикл постов про Apache Flink завершается. Мы познакомились с фреймворком Apache Flink, реализовали пару бизнес-задач, посмотрели узкие места, а также покрыли все тестами. Большинство абстракций из примеров кода проявили себя в наших проектах достаточно хорошо и уже являются устоявшимися. Вы тоже можете их использовать, уделяя максимальное внимание тестированию ваших пайплайнов.
Оглянувшись назад и оценивая свой опыт, я понимаю, что выбор Spring в качестве DI-фреймворка не был оптимальным решением и поставил перед нами ряд проблем, но об этом я расскажу уже в следующий раз.