Привет, Хабр! На связи Александр Бобряков, техлид в команде МТС Аналитики. В предыдущих постах я рассказал, как собрать первое приложение Flink со Spring, реализовав пайплайн дедупликации сообщений Kafka-to-Kafka. В этом примере погружусь в использование таймеров в Flink, а в следующих статьях расскажу, как работать с более сложными состояниями, эволюционировать их схему и покрыть это все тестами.

Весь разбираемый исходный код есть в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии. Эта статья соответствует релизной ветке с названием release/7_Trigger_Flink_Job.

Это восьмой материал из моей серии про Apache Flink. По мере выхода новых ссылки на них будут появляться ниже.

Список моих статей про Flink:
Оглавление статьи:

Как отправить уведомление по таймеру

Рассмотрим новую бизнес-задачу, которую можно решить с использованием Apache Flink.

Источник данных (Source): Kafka-топик trigger-topic, содержащий сообщения-триггеры (TriggerMessage). В них находится информация о человеке, совершившем действие одного из двух типов: START или STOP.

START подразумевает создание триггера, а STOP — завершение. Например, START отправляется, когда пользователь добавил на сайте товар в корзину, а STOP — при оплате.

TriggerMessage представляет собой JSON:

{
  "userId": "3ee2e515-2c82-4d5f-b9b9-8da5f54a7da1",
  "triggerName": "test_trigger_name",
  "status": "START",
  "deviceType": "Phone",
  "category": "Shop",
  "count": 5,
  "timestamp": 123,
  "data": {
    "field_1": "value_1"
  }
}

Поле status определяет тип триггера.

Задача отправки уведомлений: отслеживать все триггеры и отправлять уведомление AlertMessage, когда триггер запустился и длится дольше десяти минут. Например, пришло TriggerMessage со статусом START, но в течение десяти минут нет сообщения STOP с аналогичным id человека и названием триггера. Такое уведомление сработает, если посетитель бросил заполненную корзину и покинул сайт.

Приемник данных (Sink): Kafka-топик alert-topic, требующий сообщения AlertMessage, образованного путем прямого сопоставления полей из TriggerMessage:

{
  "userId": "3ee2e515-2c82-4d5f-b9b9-8da5f54a7da1",
  "triggerName": "test_trigger_name",
  "timestamp": 123
}

Целевой пайплайн

После построения джобы мы получим такой граф обработки данных:

Пайплайн начинается с Source из Kafka-топика trigger-topic. Затем мы фильтруем сообщения по полю status, пропускаем дальше те, где есть значения START или STOP, а также добавляем проверку на название триггера и наличие id клиента.

Далее пишем оператору, который будет следить за определенными событиями по ключу «название_триггера + id_клиента». При обработке START-сообщения оператор создаст таймер на десять минут, а по завершении отправит AlertMessage в дальнейший Sink — Kafka-топик alert-topic. Если STOP все-таки пришел, то AlertMessage не отправляем.

Построение Flink Job

В этом кейсе построение основных абстракций мало отличается от пайплайна из предыдущих статей про Kafka-to-Kafka с дедупликацией. Я быстро пробегусь по основным абстракциям каркаса пайплайна, так как самые интересные моменты скрыты в операторе, который будет следить за таймерами.

Определение DTO для сообщений

Сообщения на входе представляют собой структуру, соответствующую поставленной задаче:

@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class TriggerMessage {
   @JsonPropertyDescription("User id")
   private UUID userId;

   @JsonPropertyDescription("Trigger name")
   private String triggerName;

   @JsonPropertyDescription("Trigger status")
   @JsonDeserialize(using = TriggerStatus.Deserializer.class)
   private TriggerStatus status;

   @JsonPropertyDescription("Device type")
   private String deviceType;

   @JsonPropertyDescription("Category")
   private String category;

   @JsonPropertyDescription("Trigger count")
   private int count;

   @JsonPropertyDescription("Timestamp")
   private Long timestamp;

   @JsonPropertyDescription("Trigger additional data")
   private Map<String, Object> data;
}

На выходе ожидаем:

@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class AlertMessage {
   @JsonPropertyDescription("User id")
   private UUID userId;

   @JsonPropertyDescription("Trigger name")
   private String triggerName;

   @JsonPropertyDescription("Timestamp")
   private Long timestamp;
}

Определение настроек Kafka

В настройках Kafka добавятся trigger-topic и alert-topic:

kafka:
 group-id: group_id
 bootstrap-servers: localhost:29092
 topics:
   click-topic: 'click-topic'
   trigger-topic: 'trigger-topic'
   alert-topic: 'alert-topic'

Определение Kafka Source/Sink

Source и Sink имплементируют созданные ранее абстракции SourceBinder и SinkProvider соответственно:

@Component
@RequiredArgsConstructor
class TriggerMessageKafkaSourceBinder implements SourceBinder<TriggerMessage> {
   private final KafkaProperties kafkaProperties;
   private final DeserializationSchema<TriggerMessage> deserializationClickMessageSchema;

   @Override
   public DataStream<TriggerMessage> bindSource(StreamExecutionEnvironment environment) {
       final var sourceName = "Trigger message Kafka source";
       return environment.fromSource(
               KafkaSource.<TriggerMessage>builder()
                       .setBootstrapServers(kafkaProperties.getBootstrapServers())
                       .setStartingOffsets(OffsetsInitializer.committedOffsets(EARLIEST))
                       .setGroupId(kafkaProperties.getGroupId())
                       .setTopics(kafkaProperties.getTopics().getTriggerTopic())
                       .setValueOnlyDeserializer(deserializationClickMessageSchema)
                       .build(),
               WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
               sourceName
       ).name(sourceName).uid(sourceName + "_id");
   }
}

Разницы с представленным в предыдущих статьях Source для Kafka нет, за исключением используемого топика: создаем Source с помощью вспомогательного класса KafkaSourceBuilder, задаем в нем необходимые параметры, а также кастомный десериализатор сообщений TriggerMessage. Это решение основано на Jackson и реализовывает обязательные методы базового интерфейса Flink DeserializationSchema:

@Component
@RequiredArgsConstructor
class TriggerMessageDeserializationSchema implements DeserializationSchema<TriggerMessage> {
   private static final long serialVersionUID = 1L;

   private transient ObjectMapper objectMapper;

   @Override
   public void open(InitializationContext context) {
       objectMapper = createObjectMapper();
   }

   @Override
   public TriggerMessage deserialize(byte[] message) throws IOException {
       return objectMapper.readValue(message, TriggerMessage.class);
   }

   @Override
   public boolean isEndOfStream(TriggerMessage nextElement) {
       return false;
   }

   @Override
   public TypeInformation<TriggerMessage> getProducedType() {
       return TypeInformation.of(TriggerMessage.class);
   }
}

Аналогично задаем подключение к KafkaSink для alert-topic:

@Component
@RequiredArgsConstructor
public class AlertMessageKafkaSinkProvider implements SinkProvider<AlertMessage> {
   private final KafkaProperties kafkaProperties;
   private final SerializationSchema<AlertMessage> serializationProductMessageSchema;

   @Override
   public Sink<AlertMessage> createSink() {
       return KafkaSink.<AlertMessage>builder()
               .setBootstrapServers(kafkaProperties.getBootstrapServers())
               .setRecordSerializer(KafkaRecordSerializationSchema.<AlertMessage>builder()
                       .setTopic(kafkaProperties.getTopics().getAlertTopic())
                       .setValueSerializationSchema(serializationProductMessageSchema)
                       .build())
               .setDeliveryGuarantee(NONE)
               .build();
   }
}

Но используем новый сериализатор схемы AlertMessage:

@Component
@RequiredArgsConstructor
class AlertMessageSerializationSchema implements SerializationSchema<AlertMessage> {
   private static final long serialVersionUID = 1;

   private transient ObjectMapper objectMapper;

   @Override
   public void open(InitializationContext context) {
       objectMapper = createObjectMapper();
   }

   @Override
   @SneakyThrows
   public byte[] serialize(AlertMessage element) {
       return objectMapper.writeValueAsBytes(element);
   }
}

За более подробным разъяснением описанных выше абстракций можно обратиться к моей статье «Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?».

Фильтр событий

Первым бизнес-оператором после Kafka Source является фильтр событий. Это стандартный stateless-оператор, рассматривающий каждое входящее событие независимо от других. Для его реализации достаточно имплементировать интерфейс FilterFunction:

public class TriggerMessageByStatusAndUserFilter implements FilterFunction<TriggerMessage> {
   private static final long serialVersionUID = 1L;

   @Override
   public boolean filter(TriggerMessage message) {
       return Optional.ofNullable(message.getStatus())
               .map(status -> (START.equals(status) || STOP.equals(status))
                       && message.getUserId() != null
                       && isNotBlank(message.getTriggerName()))
               .orElse(false);
   }
}

В нашем случае мы должны пропускать только TriggerMessage, у которых есть статус START или STOP, id клиента и конкретное название триггера.

Оператор создания уведомления по таймеру

Основным компонентом пайплайна обработки TriggerMessage является оператор, который будет генерировать AlertMessage по условиям из технического задания. Алгоритм его работы:

  1. получаем TriggerMessage;

  2. в разрезе ключа (trigger_name + user_id) выполняем действия:

    • если статус TriggerMessage равен START, то создаем таймер на 10 минут, по истечении которого генерируем AlertMessage на основе пришедшего START события;

    • если статус TriggerMessage равен STOP, то удаляем созданный в пункте выше таймер. Если подобного сообщения еще не было (в разрезе ключа), то ничего страшного.

Для реализации такого оператора нам понадобятся знания в двух темах: Flink-таймерах и состоянии Keyed State. Я их разобрал в прошлой части цикла.

Таймеры позволяют запустить логику через определенное время. При этом отсчет может идти как от текущего времени самой машины, выполняющей задание Flink (processing time), так и от времени прошедшего события (event time). Важно помнить, что таймеры доступны только для потока Keyed Stream.

Доступ к таймерам можно получить из абстракции оператора Flink ProcessFunction. В нашем случае мы имеем дело с событиями в разрезе ключа (trigger_name + user_id), поэтому нам заранее нужно разделить поток по этому ключу с помощью метода .keyBy(). Так все события с одинаковым ключом попадут на один и тот же параллельный инстанс следующего оператора создания уведомлений, ведь в нем будем использовать KeyedState.

Указать ключ разделения можно либо с помощью лямбды при построении самого пайплайна, либо использовать имплементацию интерфейса KeySelector. Воспользуемся вторым вариантом. Поместим эту реализацию внутрь нашего нового оператора в качестве внутреннего статического класса, так как он неразрывно связан с оператором отправки уведомлений:

public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> {
    private static final long serialVersionUID = 1L;

    @Override
    public String getKey(TriggerMessage message) {
        return message.getUserId() + message.getTriggerName();
    }
}

Теперь поговорим о самом операторе отправки уведомлений. Как я писал выше, нам нужна реализация класса KeyedProcessFunction. Она предоставляет доступ к контексту Flink, через который можно получить доступ к состояниям и таймерам.

В общем случае имплементация KeyedProcessFunction выглядит так:

public class TriggerAlertProcessor<K, I, O> extends KeyedProcessFunction<K, I, O> {

   @Override
   public void open(Configuration parameters) {
      // ...
   }

   @Override
   public void processElement(O value, KeyedProcessFunction<I, O, K>.Context ctx, Collector<K> out) throws Exception {
      // ...
   }

   @Override
   public void onTimer(long timestamp, KeyedProcessFunction<I, O, K>.OnTimerContext ctx, Collector<K> out) throws Exception {
      // ...
   }
}

При обработке событий в методе processElement объект Context предоставляет доступ к элементу TimerService. Он умеет работать с метками времени: временем событий (для срабатывания триггера будет использован водяной знак Watermark) и реальным временем машины, где работает оператор. Через TimerService можно создать таймер, который запустится в определенное время в будущем.

KeyedProcessFunction также предоставляет метод onTimer() — именно он будет вызываться при срабатывании таймера. Во время этого вызова все состояния снова привязаны к ключу, с которым был создан таймер. Это позволяет таймерам управлять состоянием с ключом.

Важные замечания по поводу таймеров описаны в документации (перевод):

«Оба типа таймеров (время обработки и время события) обслуживаются внутри TimerService и помещаются в очередь для выполнения. TimerService дедуплицирует таймеры для каждого ключа и временной метки, т. е. существует не более одного таймера для каждого ключа и временной метки. Если для одной и той же временной метки зарегистрировано несколько таймеров, onTimer() метод будет вызываться только один раз. Flink синхронизирует вызовы onTimer() и processElement(). Следовательно, пользователям не нужно беспокоиться об одновременном изменении состояния».

В итоге более детальный план выполнения нашего оператора выглядит следующим образом:

  1. Получаем TriggerMessage.

  2. Если status события равен START, то с помощью TimerService создаем таймер длительностью десять минут, а также сохраняем состояние ValueState для текущего сообщения. На его основе мы в дальнейшем сможем создать AlertMessage.

  3. Если status равен STOP, то удаляем все состояния для текущего ключа и созданный ранее таймер. Flink API позволяет удалить его, передав время срабатывания, поэтому нам дополнительно понадобится состояние для хранения этого времени.

  4. onTimer() реализация должна формировать AlertMesage по сохраненному состоянию в пункте 2 и чистить все состояния по текущему ключу.

На основе этого создадим класс AlertState. Он будет отвечать за объекты, сохраненные в состоянии оператора создания уведомлений. Схематично этот класс будет использоваться в следующем преобразовании:

Такой промежуточный класс необходим для:

  1. Разделения ответственности между форматом сообщений в Async API (Kafka) и нашим внутренним состоянием. Мы не отдаем наружу внутренний формат хранения данных нашего приложения и можем независимо его менять.

  2. Уменьшения данных во внутреннем состоянии. Если бы мы хранили там целиком входящее событие, это бы очень сильно отразилось на быстродействии процессов десериализации и сериализации.

В итоге AlertState будет полностью повторять формат выходного сообщения AlertMessage для нашего простейшего примера джобы:

@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class AlertState {
   @JsonPropertyDescription("User id")
   private UUID userId;

   @JsonPropertyDescription("Trigger name")
   private String triggerName;

   @JsonPropertyDescription("Timestamp")
   private Long timestamp;
}

Теперь посмотрим на реализацию самого оператора уведомлений:

@Slf4j
public class TriggerAlertProcessor extends KeyedProcessFunction<String, TriggerMessage, AlertMessage> {
   private static final long serialVersionUID = 1L;

   private final Duration stateWaiting;
   private ValueState<AlertState> alertState;
   private ValueState<Long> timestampState;
   private transient MessageTransformer<TriggerMessage, AlertState> messageToStateTransformer;
   private transient MessageTransformer<AlertState, AlertMessage> stateToMessageTransformer;

   public TriggerAlertProcessor(@NotNull Duration stateWaiting) {
       this.stateWaiting = stateWaiting;
   }

   @Override
   public void open(Configuration parameters) {
      // ...
   }

   @Override
   public void processElement(TriggerMessage message,
                              KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx,
                              Collector<AlertMessage> out) throws Exception {
      // ...
   }

   @Override
   public void onTimer(long timestamp,
                       KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx,
                       Collector<AlertMessage> out) throws Exception {
      // ...
   }

   public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> {
      // ...
   }
}

Мы видим, что оператор наследует класс KeyedProcessFunction<K, I, O>. В нем:

  • K — формат ключа (String);

  • I — формат входных сообщений (TriggerMessage);

  • O — формат выходных событий (AlertMessage).

Далее нам нужны следующие поля класса:

  • stateWainting — длительность ожидания события STOP по нашему ТЗ. По умолчанию она составляет десять минут, и мы передадим это значение из настроек application.yml непосредственно в конструкторе оператора;

  • alertState — состояние KeyedState, которое хранит AlertState для каждого пришедшего события TriggerMessage со статусом START. В дальнейшем мы будем преобразовать его в AlertMessage;

  • timestampState — состояние KeyedState, хранящее время срабатывания таймера. Это необходимо для возможности его удаления по событию STOP, описанному выше;

  • messageToStateTransformer и stateToMessageTransformer — классы, преобразующие TriggerMessage в состояние AlertState и AlertState в AlertMessage. Их код легко понять: мы напрямую перекладываем соответствующие поля.

Теперь посмотрим на метод open():

@Override
   public void open(Configuration parameters) {
       messageToStateTransformer = new TriggerMessageToAlertStateTransformer();
       stateToMessageTransformer = new AlertStateToAlertMessageTransformer();
       final var defaultTtlConfig = StateTtlConfig
               .newBuilder(Time.minutes(stateWaiting.toMillis() + 1000))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               .setStateVisibility(NeverReturnExpired)
               .cleanupFullSnapshot()
               .build();

       // alert state
       final var alertDescriptor = new ValueStateDescriptor<>("alertState", AlertState.class);
       alertDescriptor.enableTimeToLive(defaultTtlConfig);
       alertState = getRuntimeContext().getState(alertDescriptor);

       // timestamp state
       final var timestampDescriptor = new ValueStateDescriptor<>("timestampState", Types.LONG);
       timestampDescriptor.enableTimeToLive(defaultTtlConfig);
       timestampState = getRuntimeContext().getState(timestampDescriptor);
   }

Как я упоминал в предыдущих статьях, в основном он необходим для инициализации объекта после его десериализации в слоте TaskManager перед непосредственным началом обработки сообщений (инициализацией transient-полей, подготовкой обработки событий и так далее).

В нашей реализации мы инициализируем:

  1. Трансформеры сообщений: TriggerMessage → AlertState и AlertState →  AlertMessage.

  2. Конфигурацию TTL, где время ttl немного больше, чем время ожидания в 10 минут.

  3. Состояние для AlertState и Timestamp: для этого передаем названия и типы хранящихся данных. Обратите внимание на важный момент: для инициализации состояний мы используем указание классов напрямую, хотя есть возможность указать сериализатор. Об этом я подробнее расскажу в следующих статьях.

Теперь оператор настроен и готов к обработке событий. Она производится в методе processElement, куда поступают TriggerMessage, ссылка до контекста Flink, а также ссылка out для добавления в выходной поток событий из оператора.

@Override
   public void processElement(TriggerMessage message,
                              KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx,
                              Collector<AlertMessage> out) throws Exception {
       final var status = message.getStatus();
       if (START.equals(status)) {
           // create timer
           final var state = messageToStateTransformer.transform(message);
           alertState.update(state);
           final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();
           final var previousTimestamp = timestampState.value();
           if (previousTimestamp != null) {
               ctx.timerService().deleteProcessingTimeTimer(previousTimestamp);
           }
           ctx.timerService().registerProcessingTimeTimer(invokeTimerMillis);
           timestampState.update(invokeTimerMillis);
       } else if (STOP.equals(status)) {
           // remove timer
           final var invokeTimerMillis = timestampState.value();
           if (invokeTimerMillis != null) {
               ctx.timerService().deleteProcessingTimeTimer(invokeTimerMillis);
               timestampState.clear();
           }
           alertState.clear();
       } else {
           log.debug("Unknown trigger status {} for key {}", status, ctx.getCurrentKey());
       }
   }

Взглянем на внутренности метода более подробно:

  1. Определили статус события — START или STOP. Остальные типы к нам не придут: вспоминаем первый фильтр в нашем пайплайне. Текущий оператор знать об этом не должен, поэтому добавляем логирование.

  2. В случае START мы трансформируем пришедший триггер в объект состояния AlertState и сохраняем его в это состояние, а далее создаем сам таймер:

final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();

Этой строкой берем текущее время машины, где выполняется задание Flink и прибавляем ему десять минут по бизнес-условию. В итоге получаем время срабатывания таймера. Если таймер для текущего ключа (trigger_name + user_id) уже есть, то удаляем его и просто создаем новый по вычисленному времени. Также запоминаем это время отдельно в состоянии timestampState.

  1. В случае STOP проверяем, существует ли в состоянии хоть какое-то время. Если оно есть, то мы создавали таймер в START блоке. Удаляем его и чистим все состояния для текущего ключа события.

Цель метода processElement — создавать таймеры и чистить состояния по бизнес-сценариям. Заметьте, что мы нигде в этом методе не добавляли события в выходной поток с помощью ссылки out. В выходной поток мы добавим события только при срабатывании таймера. За это отвечает метод onTimer():

@Override
   public void onTimer(long timestamp,
                       KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx,
                       Collector<AlertMessage> out) throws Exception {
       final var alertStateValue = alertState.value();
       if (alertStateValue != null) {
           final var alertMessage = stateToMessageTransformer.transform(alertStateValue);
           out.collect(alertMessage);
       }
       timestampState.clear();
       alertState.clear();
   }

На входе он получает timestamp срабатывания таймера, все ту же ссылку на Flink, контекст и ссылку out. Взглянем на его реализацию подробнее:

  1. Сначала мы достаем состояние AlertState, которое было добавлено на этапе обработке события в processElement.

  2. Затем трансформируем его в формат выходного события AlertMessage.

  3. В конце очищаем все состояния, потому что они больше не нужны.

Собираем Flink Job

Чтобы гибко настраивать параметры джобы, вынесем длительность таймеров в настройки application.yml (с маппингом на класс TriggerToAlertJobProperties):

jobs:
 trigger-to-alert-job:
   enabled: true
   state-waiting: 10m

Собрав все операторы вместе, получим такую имплементацию нашего класса FlinkJob:

@Component
@AllArgsConstructor
@ConditionalOnProperty("jobs.trigger-to-alert-job.enabled")
public class TriggerToAlertJob extends FlinkJob {
   private final TriggerToAlertJobProperties properties;
   private final SourceBinder<TriggerMessage> sourceBinder;
   private final SinkProvider<AlertMessage> sinkProvider;

   @Override
   public void registerJob(StreamExecutionEnvironment env) {
       sourceBinder.bindSource(env)
               .filter(new TriggerMessageByStatusAndUserFilter())
               .uid("filter_trigger_message_by_status_id").name("filter_trigger_message_by_status")
               .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
               .process(new TriggerAlertProcessor(properties.getStateWaiting()))
               .uid("trigger_alert_processor_id").name("trigger_alert_processor")
               .sinkTo(sinkProvider.createSink()).uid("sink_alert_message_id").name("sink_alert_message");
   }
}

В качестве полей класса стандартно используем настройки для этой джобы, поля для получения Kafka Source и генерации Kafka Sink. В самой джобе по нашему начальному пайплайну определяем фильтр, оператор с таймерами и sink. Конечно, не забываем указать удобочитаемые имена и уникальный id для корректного восстановления состояния из savepoint.

В следующих статьях мы проверим работоспособность всего этого тестами. Используем E2E-тесты для основного сценария, unit-тесты для TriggerToAlertJob без Kafka и для отдельных операторов вместе с бизнес-сценариями генерации таймеров. Начнем со стандартных тестов, а далее перейдем к достаточно сложным и неочевидным проблемам, которые могут возникнуть в подобных Flink-джобах.

Комментарии (1)


  1. kenoma
    30.08.2024 11:55

    А могли бы использовать сагу и абстрагироваться от транспортного уровня.