Всем привет, меня зовут Александр Бобряков. Я техлид в команде МТС Аналитики, занимаюсь Real-Time обработкой данных. Недавно мы начали использовать фреймворк Apache Flink, и я решил поделиться на Хабре своим опытом внедрения этой технологии в наши продукты в цикле статей. В предыдущей части я рассказывал про основные концепции потоковой обработки данных. А ещё затронул архитектуру и главные механизмы Apache Flink.

В этой статье начнем разработку Flink-приложения с использованием фреймворка Spring. Изучим структуру приложения, основные плагины и полезные настройки. Развернем Flink-кластер в Docker и попробуем запустить первое Flink-задание. Структура приложения будет постепенно развиваться в последующих статьях.

Это вторая статья из серии моих материалов про Apache Flink. По мере выхода новых постов, ссылки на них будут появляться ниже.

Список моих постов про Flink

Весь разбираемый исходный код можно найти в репозитории. В master-ветке представлен итоговый проект по всей серии статей. Этот пост соответствует релизной ветке с названием release/1_Flink_with_Spring_template.

Flink + Spring. Первые шаги

Дисклеймер: использование Spring — это не best-practice, так как проще прибегнуть к более легковесным DI-фреймворкам.

Но Spring привычнее любому java-разработчику, а внедрять что-то новое как правило дорого по ресурсам. Все же при его использовании рекомендуется исключить все ненужные вам зависимости для уменьшения размера запускаемого толстого jar-файла (в этой статье мы на этом не останавливаемся, но если будут вопросы — пишите).

Стек технологий

В проекте я буду использовать следующие инструменты и технологии:

Понятное дело, вы можете использовать более свежие версии.

Создаем проект

Первым делом создаем базовый шаблон приложения в Idea:

Теперь нам нужно определить плагины, зависимости и другие параметры сборки приложения. Блок с плагинами будет выглядеть следующим образом:

plugins {
   id 'java'
   id 'application'
   id 'org.springframework.boot' version '2.7.7'
   id 'io.spring.dependency-management' version '1.0.15.RELEASE'
   id 'jacoco'
   id 'com.github.johnrengelman.shadow' version '7.1.2'
   id 'pmd'
}

Теперь переходим к плагину shadow. Для запуска приложения в Flink-кластере нам нужно собрать исполняемый jar-файл. Стандартный плагин Spring Framework не подойдет. Он использует специфический макет загрузки классов, который не поддерживается Apache Flink.

Нам нужен другой способ создания толстого jar-файла со всеми добавленными зависимостями, и им может стать Gradle Shadow. Подробнее про упаковку программ для Flink написано в документации.

Использовать этот плагин достаточно просто, его настройка выглядит следующим образом:

shadowJar {
   dependsOn 'check'
   zip64 true
   mergeServiceFiles()
   append 'META-INF/spring.handlers'
   append 'META-INF/spring.schemas'
   append 'META-INF/spring.tooling'
   transform(PropertiesFileTransformer) {
       paths = ['META-INF/spring.factories']
       mergeStrategy = "append"
   }
}

Список зависимостей можно определить вот так, но они будут расширяться по мере развития приложения:

ext {
   set('flinkVersion', '1.17.0')
   set('jacksonVersion', '2.14.1')
   set('awaitilityVersion', '4.2.0')
   set('testcontainersVersion', "1.18.0")
}

dependencies {
   implementation 'org.springframework.boot:spring-boot-starter'
   implementation 'org.springframework.boot:spring-boot-starter-validation'
   implementation "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
   implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
   implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
   implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
   implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${jacksonVersion}"

   annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
   annotationProcessor 'org.projectlombok:lombok'
   compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}"
   compileOnly 'org.projectlombok:lombok'

   testImplementation 'ch.qos.logback:logback-classic:1.2.9'
   testImplementation 'org.springframework.boot:spring-boot-starter-test'
   testImplementation "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
   testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
   testImplementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"
   testImplementation 'org.testcontainers:junit-jupiter'
   testImplementation "org.awaitility:awaitility:${awaitilityVersion}"
   testImplementation "commons-io:commons-io:2.11.0"

   testAnnotationProcessor 'org.projectlombok:lombok'
   testCompileOnly 'org.projectlombok:lombok'
}

Мы используем стандартные зависимости, поэтому подробно останавливаться на них не будем.

Следом укажем Main-класс приложения в результирующем манифесте. Это можно сделать путем установки в build.gradle параметра:

mainClassName = 'com.asbobryakov.flink_spring.Main'

В итоге для сборки исполняемого jar-файла нужно будет воспользоваться командой:

./gradlew shadowJar

Xlint:serial

Дополнительно в build.gradle рекомендую добавить флаг для javac-компилятора –Xlint:serial.

tasks.withType(JavaCompile) {
   options.compilerArgs << "-Xlint:serial" << "-Werror"
}

С этой настройкой будет выдаваться предупреждение, если есть класс, реализующий интерфейс Serializable, в котором не определено поле serialVersionUID.

В мире распределенного вычисления это важно: при выполнении программы наши классы будут сериализовываться для передачи между компонентами кластера.

Структура приложения

Проект будет выглядеть так:

В качестве настроек application.yml исходим из определения имени приложения и флага для старта пайплайнов обработки данных, который заиспользуем чуть дальше:

flink:
 app-name: flink-spring
 submit-jobs-on-app-start: true

Так как мы имеем дело со Spring, наше приложение содержит стандартный основной класс, являющийся фактической точкой входа в программу. Именно его мы указывали в gradle-настройке «mainClassName»:

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}

В момент запуска приложения произойдет поднятие контекста, после чего можно начать основную работу. Определяем листенер, который будет выступать логической точкой входа в программу:

@Component
@RequiredArgsConstructor
@ConditionalOnProperty("flink.submit-jobs-on-app-start")
public class AppListener {
    private final JobStarter jobStarter;

    @EventListener(ApplicationStartedEvent.class)
    @SneakyThrows
    public void onApplicationStart() {
        jobStarter.startJobs();
    }
}

Как раз в этом месте удобно иметь флаг «flink.submit-jobs-on-app-start» для управления созданием этого компонента. Он пригодится в тестах (сразу выставим в false) и при первых деплоях приложения, когда ничего не понятно и хочется попробовать просто запустить пустой jar без непосредственной логики.

Задача листенера — передать управление по запуску задач нашему кастомному компоненту JobStarter. Для этого определим абстрактный класс FlinkJob, который будут реализовывать все наши бизнес-джобы, запускаемые в Flink:

public abstract class FlinkJob {
   public abstract void registerJob(StreamExecutionEnvironment env);
}

Метод registerJob должен создать и зарегистрировать весь пайплайн обработки данных и привязать его в Flink-контексте StreamExecutionEnvironment. Об этом поговорим далее. Возвращаясь к JobStarter, напишем код для поиска всех задач и их запуска:

@Service
@Slf4j
@RequiredArgsConstructor
public class JobStarter {
   private final StreamExecutionEnvironment environment;
   private final List<FlinkJob> jobs;

   @SneakyThrows
   public JobClient startJobs() {
       if (jobs.isEmpty()) {
           log.info("No Jobs found for start");
           return null;
       }
       for (FlinkJob job : jobs) {
           log.info("Register job '{}'", job.getClass().getSimpleName());
           job.registerJob(environment);
       }
       return environment.executeAsync();
   }
}

В коде все достаточно просто: Spring находит бины, реализующие FlinkJob, после чего инжектит их в бин текущего класса. Он по ним пробегается и регистрирует. Для асинхронного запуска задач в Flink используется метод executeAsync().

Поговорим подробнее о классе StreamExecutionEnvironment. Это отправная точка для конфигурации Flink-приложения и создания пайплайнов потоковой обработки данных.

Чтобы получить объект этого класса, можно воспользоваться статическим методом StreamExecutionEnvironment.getExecutionEnvironment(). Flink сам поймет, как ему обработать его вызов: если запуск произведен с локального компьютера без предварительно развернутого Flink-кластера, то будет автоматически создан мини-кластер под текущие ресурсы компьютера. А при запуске на ноде реального  Flink-кластера  будет найден именно этот кластер.

По сути это точка входа, на основе которой мы построим все операции. Позже они исполнятся на найденном (или созданном) окружении. Для регистрации такого бина напишем Spring класс-конфигурацию:

@Configuration
public class FlinkConfig {

   @Bean
   public StreamExecutionEnvironment streamExecutionEnvironment() {
       return StreamExecutionEnvironment.getExecutionEnvironment();
   }
}

Общие шаги создания любой задачи можно описать как:

  1. Получение ExecutionEnvironment

  2. Определение источников данных (Source)

  3. Определение преобразований данных (Operator)

  4. Определение получателей данных (Sink)

  5. Запуск

Осталось определить базовую Flink-задачу в одной из нашей реализации FlinkJob:

@Component
public class SimpleJob extends FlinkJob {

   @Override
   public void registerJob(StreamExecutionEnvironment env) {
       env.fromElements("value_1", "value_2", "value_3")
           .map(value -> "after_map_" + value)
           .print();
   }
}

Как можно увидеть, сначала определяем источник данных — список конкретных String-объектов. Существует API для указания более сложных источников, таких как Kafka, но эти примеры мы рассмотрим в следующих статьях.

Далее идет преобразование каждого элемента с помощью метода map, а в качестве приемника данных используется STDOUT. В целом процесс построения пайплайна очень похож на работу с Java Stream API. В Flink существует другой API для построения программ (например Table API), но сейчас сосредоточимся на DataStream API.

На этом реализация задачи закончена. Мы ожидаем, что Flink обработает три элемента-строки, преобразует их и напечатает в stdout результат. Теперь это нужно проверить, но обойдемся пока без тестов. Мы запустим свой Flink Cluster через docker-compose.

Запуск Flink-приложения в локальном кластере

Для пробного запуска нам понадобится локально поднять Flink Cluster. Для этого можно воспользоваться docker-compose.

Flink-кластер в docker-compose

В docker-compose определяем JobManager и хотя бы один TaskManager. Можем указать несколько TaskManager для увеличения уровня параллельности вычислений. Роли этих компонентов мы рассмотрели в предыдущей статье.

version: '3.9'

services:
 jobmanager:
   image: flink:1.17.0-scala_2.12-java11
   ports:
     - '8081:8081'
   environment:
     FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
   command:
     - jobmanager

 taskmanager:
   image: flink:1.17.0-scala_2.12-java11
   environment:
     FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
   command:
     - taskmanager

Обратите внимание на порт 8081 – он используется для предоставления Flink UI (http://localhost:8081)

Flink UI и запуск приложения

После запуска docker-compose и перехода на http://localhost:8081 мы попадем во Flink UI:

На первой вкладке Overview отображается основная информация. Также видно количество доступных TaskManager и слотов (1 в нашем случае согласно определению docker-compose файла).

Для ручного запуска jar-файла переходим во вкладку «Submit New Job» и запускаем предварительно собранный jar. Он будет создан в каталоге ./build/libs/) командой:

 ./gradlew shadowJar

Main-класс автоматически подтянулся из манифеста Jar. При необходимости тут можно указать дополнительные параметры запуска — например, уровень параллелизма или точку сохранения. Когда приложение заработает, увидим следующую картину:

Важная оговорка: для более красивой визуализации графа задачи (разбиение каждого оператора пайплайна на отдельные блоки) я использую настройку streamExecutionEnvironment.disableOperatorChaining().  Подробнее про это можно почитать в документации Flink. Без неё вы увидите на графе только один блок, совмещающий сразу три операции: source, map. sink.

Это происходит, потому что Flink старается объединить операторы в один, чтобы не тратить ресурсы на сериализацию данных между операторами. Такое возможно, если последовательные операторы имеют одинаковую параллельность, а данные между их параллельными задачами (Tasks) не перемешиваются (forward-передача). Для боевого решения выключать эту настройку не рекомендуется.

В итоге видно, что наша задача завершилась корректно. Это происходит, т.к. источник данных имеет конечное число элементов, а Flink автоматически останавливает задание по окончанию их обработки. Подробнее об этом расскажу в следующих статьях, когда будем работать с «unbounded» потоками. Будь источником данных Kafka, наша задача продержалась бы в состоянии Running условно вечно.

Результаты

По умолчанию Sink в виде print() печатает результат в стандартный поток вывода машины, на которой запущено приложение. Открываем логи контейнеров docker:

1) docker-compose-jobmanager-1 (flink:1.17.0-scala_2.12-java11)

Видим лог о поиске и запуске заданий классом JobStarter.
Видим лог о поиске и запуске заданий классом JobStarter.

2) docker-compose-taskmanager-1  (flink:1.17.0-scala_2.12-java11)

Видим лог событий в потоке данных,  а также информацию о завершении задачи.
Видим лог событий в потоке данных,  а также информацию о завершении задачи.

Собственно, вот так это и работает. В следующих постах рассмотренная джоба SimpleJob нам не потребуется — она была нужна для наглядности. В третьей статье мы рассмотрим реальную бизнес-задачу с созданием пайплайна данных Kafka-to-Kafka с дедупликацией сообщений.

Спасибо, что прочитали! Если возникнут вопросы, задавайте их в комментариях.

Комментарии (10)


  1. mikegordan
    23.11.2023 19:24

    Не знаете они пофиксили, когда делаешь Table to DataStream с помощью модели (class) если выбраны не все поля модели вылетал Exception ? и изза этого нужно было указывать фейковые данные по всем полям модели , даже если эти поля были не нужны.


    1. appp_master Автор
      23.11.2023 19:24

      С такой проблемой на практике не сталкивался.

      Воспроизвел проблему в тестах на Flink 1.15 (для него у меня уже есть настроенные E2E-тесты с Hive 1.2.1 в проекте) и вот что получил:

      1. Java-класс, в который буду считываться данные:

      @AllArgsConstructor
      @NoArgsConstructor
      @Data
      public static class HiveRow {
          @DataTypeHint("INT")
          private Integer id;
          @DataTypeHint("STRING")
          private String name;
      }
      1. Hive-таблица имеет поле useless, которого нет в Java-классе:

      CREATE TABLE hiveTable (id INT, name STRING, useless STRING)
      1. Пайплайн

      // ...
      final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable);
      tableEnv.toDataStream(inputTable, HiveRow.class).print();
      env.execute();
      //...

      В этом случае падает такая ошибка:

      org.apache.flink.table.api.ValidationException: Column types of query result and sink for '*anonymous_datastream_sink$1*' do not match.
      Cause: Different number of columns.
      
      Query schema: [id: INT, name: STRING, useless: STRING]
      Sink schema:  [id: INT, name: STRING]

      Если вы имели в виду именно ее и хотели бы из коробки иметь возможность пропускать неуказанные в Java-классе поля, то:

      1. С помощью автоматических Flink-средств проблему решить не удалось. Продебажил основные Flink-абстракции и везде, где можно было бы зацепиться кастомным кодом, висят Internal-аннотации. По сути регламент такой, что при чтении таблицы указанный класс должен полностью соотноситься со схемой данных, то есть класс типизирует api до непосредственного получения схемы из самой таблицы в рантайме.

      2. Написать подобный конвертер самому пока возможно через явно заданный далее map-оператор ( Row -> HiveRow ), но тогда вы будете вычитывать "лишние" поля, чего не хотелось бы

      3. Из других решений - использовать не передачу класса для преобразования таблицы в DataStream, а напрямую схему.

      4. Как плюс-минус нормальное решение можно было бы взять следующее (рабочий вариант):

      final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable)
                                 .select(col("id"), col("name"));
      tableEnv.toDataStream(inputTable, HiveRow.class)
          .print();

      То есть вы напрямую указываете колонки в селекте, соответствующие java-классу. Это работает. Далее все таки не хочется хардкодить тут имена колонок, а управлять только java-классом. Это натолкнуло на мысль, что можно создать метод, который будет анализировать java-класс и генерировать все аргументы для метода select(...) при построении джобы (а значит без ухудшения перфоманса в рантайме) например так:

      // org.apache.flink.table.expressions.Expression
      public static Expression[] extractColumns(Class<?> clazz) {
          final var result = new ArrayList<Expression>();
          for (Field field : clazz.getDeclaredFields()) {
              result.add(col(field.getName()));
          }
          return result.toArray(Expression[]::new);
      }

      В итоге пайплайн будет выглядеть следующим образом:

      final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable)
                                 .select(extractColumns(HiveRow.class));
      tableEnv.toDataStream(inputTable, HiveRow.class)
          .print();

      Это тоже работает, хотя дважды дублируем HiveRow.class. В целом дальше в Java-классе можно вешать и анализировать уже свои аннотации, чтобы в них хранить имена соответствующих полей в таблице, если не хотим их повторять в классе. Но это уже рефакторинг.

      Возможно вашу проблему неправильно понял


  1. astamur
    23.11.2023 19:24

    Работаю с Flink какое-то время. Предположу, что будет не очень удобно иметь несколько разных пайплайнов в одной задаче:
    - при обновления останавливаться будут все пайплайны сразу;
    - больше рисков, что проблема с одним пайплайном вызовет сбой в работе всей задачи (память, исключения и т.д.);
    - ну и если даже разные задачи в одном кластере, то уже проблемы с одной задачей могут затронуть весь кластер.

    Чаще разворачивают 1 задачу на кластер. Вижу в этом следующие преимущества:
    - можно более точно настроить ресурсы под конкретный тип нагрузки и особенности задачи (количество TM, разные настройки Flink, память, RocksDB и т.д);
    -меньше рисков сбоя и легче обновлять.

    Могу еще посоветовать идею упаковки вашего кода в образ на основе Flink (можно экстернализировать и настройки для кластера). И тогда каждый пайплайн будет отдельным образом. Берешь и разворачиваешь в отдельный namespace k8s, например.


    1. mikegordan
      23.11.2023 19:24

      вы предлагаете на каждую JOB создавать огромный целый проект JAVA? И каждый JOB-проект хранить в отдельном репозитории?

      Это же сумма можно сойти переключатся (запускать IDE) под каждую JOB.


      1. astamur
        23.11.2023 19:24

        Можно в одном проекте код для нескольких разных задач (Flink jobs) хранить, просто собирать свой образ для каждой. Плюс если у них общие зависимости, это может даже один модуль быть. Если разные версии Flink - другой модуль или отдельный репозиторий. Тут уже кому как удобнее. Но понятно, что над сборкой контейнеров надо поработать.


    1. appp_master Автор
      23.11.2023 19:24

      Чтобы не было путаницы в моем ответе ниже, сразу оговорюсь, что буду понимать под Job именно флинковый запуск артефакта, а под пайплайном - конечный набор операторов для выполнения какой-то бизнес-задачи. То есть как вы правильно заметили - в одной Job сейчас могут быть несколько пайплайнов.

      Мы выбрали следующую схему:

      1. Храним разные пайплайны в одном артефакте, если используем "общий API". Под общим API имею в виду набор библиотек одинаковых версий, разных абстракций и тд которые требуются каждому пайплайну.

        Например, пайплайны Kafka-to-Kafka будут требовать одни зависимости, а File-to-Hive другие (например, потому что у нас старый Hive с Flink 1.15). Тогда у нас имеются два разных артефакта, чтобы ненужные зависимости одного не были внутри другого.

      2. Под каждый пайплайн внутри артефакта делаем feature-toggle, который этот пайплайн запускает

      Такой подход никак не противоречит проблемам, которые вы описываете. По сути то, что вы описали - выбор между Session/Application mode развертыванием:

      • На проде мы используем Application Mode - берем собранный артефакт и запускаем отдельную Job в которой включен только один соответствующий пайплайн. Для второго пайплайна берем этот же артефакт и запускаем следующую Job в своем кластере (Application Mode) с включенным feature-toggle второго пайплайна и тд

      • На тестовом стенде вполне можем запустить все пайплайны сразу в одном кластере с минимальными ресурсами (Session Mode) просто включив все feature-toggle для каждого пайплайна


      1. astamur
        23.11.2023 19:24

        Да, всё верно поняли. Спасибо за пояснения. Интересно ещё, каким образом разворачивает Flink кластер. k8s оператор?


        1. appp_master Автор
          23.11.2023 19:24
          +1

          Да, k8s-оператор, но каких-то тонкостей я рассказать сразу не смогу, так как этим занимается специальный devops, который все настраивал в соответствии с требованиями компании


  1. ryabchikov91
    23.11.2023 19:24

    Спасибо за статью.
    Можно у вас поинтересоваться следующим:
    Я правильно понимаю, что вы используете Spring Boot исключительно для объединения всех Flink Jobs в один jar-архив, чтобы потом за 1 раз запустить сразу все джобы на кластере или же есть еще какие-то причины использования Spring?


    1. appp_master Автор
      23.11.2023 19:24

      Из преимуществ использования Spring скорее только то, что легче и быстрее становится работать с кодом: автоконфигурации, стартеры, DI при построении пайплайнов, разные интеграции, которые упростят тестирование, более привычная работа с application.yml и тд. Везде понемногу, а в сумме новому разработчику уже легче будет ориентироваться и добавить что-то новое из примеров стандартных проектов.

      Например, в тестах где-то заиспользовали аннотацию DynamicPropertySource, для легкости работы с TestContainers, далее в тестовых зависимостях добавили Spring-Kafka, чтобы одной аннотацией создать KafkaListener для прослушивания результирующего топика в E2E-тесте и тд.

      Про формирование jar_ника мы используем плагин com.github.johnrengelman.shadow. По поводу запуска 1 раз джобы расписал более подробно нашу стратегию в комментарии выше - пока что такой подход удобен, но возможно будем его менять.