Привет, Хабр! На связи Александр Бобряков, техлид в команде МТС Аналитики. В предыдущих постах я рассказал, как собрать первое приложение Flink со Spring, реализовав пайплайн дедупликации сообщений Kafka-to-Kafka. В этом примере погружусь в использование таймеров в Flink, а в следующих статьях расскажу, как работать с более сложными состояниями, эволюционировать их схему и покрыть это все тестами.
Весь разбираемый исходный код есть в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии. Эта статья соответствует релизной ветке с названием release/7_Trigger_Flink_Job.
Это восьмой материал из моей серии про Apache Flink. По мере выхода новых ссылки на них будут появляться ниже.
Список моих статей про Flink:
Оглавление статьи:
Как отправить уведомление по таймеру
Рассмотрим новую бизнес-задачу, которую можно решить с использованием Apache Flink.
Источник данных (Source): Kafka-топик trigger-topic, содержащий сообщения-триггеры (TriggerMessage). В них находится информация о человеке, совершившем действие одного из двух типов: START или STOP.
START подразумевает создание триггера, а STOP — завершение. Например, START отправляется, когда пользователь добавил на сайте товар в корзину, а STOP — при оплате.
TriggerMessage представляет собой JSON:
{
"userId": "3ee2e515-2c82-4d5f-b9b9-8da5f54a7da1",
"triggerName": "test_trigger_name",
"status": "START",
"deviceType": "Phone",
"category": "Shop",
"count": 5,
"timestamp": 123,
"data": {
"field_1": "value_1"
}
}
Поле status определяет тип триггера.
Задача отправки уведомлений: отслеживать все триггеры и отправлять уведомление AlertMessage, когда триггер запустился и длится дольше десяти минут. Например, пришло TriggerMessage со статусом START, но в течение десяти минут нет сообщения STOP с аналогичным id человека и названием триггера. Такое уведомление сработает, если посетитель бросил заполненную корзину и покинул сайт.
Приемник данных (Sink): Kafka-топик alert-topic, требующий сообщения AlertMessage, образованного путем прямого сопоставления полей из TriggerMessage:
{
"userId": "3ee2e515-2c82-4d5f-b9b9-8da5f54a7da1",
"triggerName": "test_trigger_name",
"timestamp": 123
}
Целевой пайплайн
После построения джобы мы получим такой граф обработки данных:
Пайплайн начинается с Source из Kafka-топика trigger-topic. Затем мы фильтруем сообщения по полю status, пропускаем дальше те, где есть значения START или STOP, а также добавляем проверку на название триггера и наличие id клиента.
Далее пишем оператору, который будет следить за определенными событиями по ключу «название_триггера + id_клиента». При обработке START-сообщения оператор создаст таймер на десять минут, а по завершении отправит AlertMessage в дальнейший Sink — Kafka-топик alert-topic. Если STOP все-таки пришел, то AlertMessage не отправляем.
Построение Flink Job
В этом кейсе построение основных абстракций мало отличается от пайплайна из предыдущих статей про Kafka-to-Kafka с дедупликацией. Я быстро пробегусь по основным абстракциям каркаса пайплайна, так как самые интересные моменты скрыты в операторе, который будет следить за таймерами.
Определение DTO для сообщений
Сообщения на входе представляют собой структуру, соответствующую поставленной задаче:
@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class TriggerMessage {
@JsonPropertyDescription("User id")
private UUID userId;
@JsonPropertyDescription("Trigger name")
private String triggerName;
@JsonPropertyDescription("Trigger status")
@JsonDeserialize(using = TriggerStatus.Deserializer.class)
private TriggerStatus status;
@JsonPropertyDescription("Device type")
private String deviceType;
@JsonPropertyDescription("Category")
private String category;
@JsonPropertyDescription("Trigger count")
private int count;
@JsonPropertyDescription("Timestamp")
private Long timestamp;
@JsonPropertyDescription("Trigger additional data")
private Map<String, Object> data;
}
На выходе ожидаем:
@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class AlertMessage {
@JsonPropertyDescription("User id")
private UUID userId;
@JsonPropertyDescription("Trigger name")
private String triggerName;
@JsonPropertyDescription("Timestamp")
private Long timestamp;
}
Определение настроек Kafka
В настройках Kafka добавятся trigger-topic и alert-topic:
kafka:
group-id: group_id
bootstrap-servers: localhost:29092
topics:
click-topic: 'click-topic'
trigger-topic: 'trigger-topic'
alert-topic: 'alert-topic'
Определение Kafka Source/Sink
Source и Sink имплементируют созданные ранее абстракции SourceBinder и SinkProvider соответственно:
@Component
@RequiredArgsConstructor
class TriggerMessageKafkaSourceBinder implements SourceBinder<TriggerMessage> {
private final KafkaProperties kafkaProperties;
private final DeserializationSchema<TriggerMessage> deserializationClickMessageSchema;
@Override
public DataStream<TriggerMessage> bindSource(StreamExecutionEnvironment environment) {
final var sourceName = "Trigger message Kafka source";
return environment.fromSource(
KafkaSource.<TriggerMessage>builder()
.setBootstrapServers(kafkaProperties.getBootstrapServers())
.setStartingOffsets(OffsetsInitializer.committedOffsets(EARLIEST))
.setGroupId(kafkaProperties.getGroupId())
.setTopics(kafkaProperties.getTopics().getTriggerTopic())
.setValueOnlyDeserializer(deserializationClickMessageSchema)
.build(),
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
sourceName
).name(sourceName).uid(sourceName + "_id");
}
}
Разницы с представленным в предыдущих статьях Source для Kafka нет, за исключением используемого топика: создаем Source с помощью вспомогательного класса KafkaSourceBuilder, задаем в нем необходимые параметры, а также кастомный десериализатор сообщений TriggerMessage. Это решение основано на Jackson и реализовывает обязательные методы базового интерфейса Flink DeserializationSchema:
@Component
@RequiredArgsConstructor
class TriggerMessageDeserializationSchema implements DeserializationSchema<TriggerMessage> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) {
objectMapper = createObjectMapper();
}
@Override
public TriggerMessage deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, TriggerMessage.class);
}
@Override
public boolean isEndOfStream(TriggerMessage nextElement) {
return false;
}
@Override
public TypeInformation<TriggerMessage> getProducedType() {
return TypeInformation.of(TriggerMessage.class);
}
}
Аналогично задаем подключение к KafkaSink для alert-topic:
@Component
@RequiredArgsConstructor
public class AlertMessageKafkaSinkProvider implements SinkProvider<AlertMessage> {
private final KafkaProperties kafkaProperties;
private final SerializationSchema<AlertMessage> serializationProductMessageSchema;
@Override
public Sink<AlertMessage> createSink() {
return KafkaSink.<AlertMessage>builder()
.setBootstrapServers(kafkaProperties.getBootstrapServers())
.setRecordSerializer(KafkaRecordSerializationSchema.<AlertMessage>builder()
.setTopic(kafkaProperties.getTopics().getAlertTopic())
.setValueSerializationSchema(serializationProductMessageSchema)
.build())
.setDeliveryGuarantee(NONE)
.build();
}
}
Но используем новый сериализатор схемы AlertMessage:
@Component
@RequiredArgsConstructor
class AlertMessageSerializationSchema implements SerializationSchema<AlertMessage> {
private static final long serialVersionUID = 1;
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) {
objectMapper = createObjectMapper();
}
@Override
@SneakyThrows
public byte[] serialize(AlertMessage element) {
return objectMapper.writeValueAsBytes(element);
}
}
За более подробным разъяснением описанных выше абстракций можно обратиться к моей статье «Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?».
Фильтр событий
Первым бизнес-оператором после Kafka Source является фильтр событий. Это стандартный stateless-оператор, рассматривающий каждое входящее событие независимо от других. Для его реализации достаточно имплементировать интерфейс FilterFunction:
public class TriggerMessageByStatusAndUserFilter implements FilterFunction<TriggerMessage> {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(TriggerMessage message) {
return Optional.ofNullable(message.getStatus())
.map(status -> (START.equals(status) || STOP.equals(status))
&& message.getUserId() != null
&& isNotBlank(message.getTriggerName()))
.orElse(false);
}
}
В нашем случае мы должны пропускать только TriggerMessage, у которых есть статус START или STOP, id клиента и конкретное название триггера.
Оператор создания уведомления по таймеру
Основным компонентом пайплайна обработки TriggerMessage является оператор, который будет генерировать AlertMessage по условиям из технического задания. Алгоритм его работы:
получаем TriggerMessage;
-
в разрезе ключа (trigger_name + user_id) выполняем действия:
если статус TriggerMessage равен START, то создаем таймер на 10 минут, по истечении которого генерируем AlertMessage на основе пришедшего START события;
если статус TriggerMessage равен STOP, то удаляем созданный в пункте выше таймер. Если подобного сообщения еще не было (в разрезе ключа), то ничего страшного.
Для реализации такого оператора нам понадобятся знания в двух темах: Flink-таймерах и состоянии Keyed State. Я их разобрал в прошлой части цикла.
Таймеры позволяют запустить логику через определенное время. При этом отсчет может идти как от текущего времени самой машины, выполняющей задание Flink (processing time), так и от времени прошедшего события (event time). Важно помнить, что таймеры доступны только для потока Keyed Stream.
Доступ к таймерам можно получить из абстракции оператора Flink ProcessFunction. В нашем случае мы имеем дело с событиями в разрезе ключа (trigger_name + user_id), поэтому нам заранее нужно разделить поток по этому ключу с помощью метода .keyBy(). Так все события с одинаковым ключом попадут на один и тот же параллельный инстанс следующего оператора создания уведомлений, ведь в нем будем использовать KeyedState.
Указать ключ разделения можно либо с помощью лямбды при построении самого пайплайна, либо использовать имплементацию интерфейса KeySelector. Воспользуемся вторым вариантом. Поместим эту реализацию внутрь нашего нового оператора в качестве внутреннего статического класса, так как он неразрывно связан с оператором отправки уведомлений:
public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> {
private static final long serialVersionUID = 1L;
@Override
public String getKey(TriggerMessage message) {
return message.getUserId() + message.getTriggerName();
}
}
Теперь поговорим о самом операторе отправки уведомлений. Как я писал выше, нам нужна реализация класса KeyedProcessFunction. Она предоставляет доступ к контексту Flink, через который можно получить доступ к состояниям и таймерам.
В общем случае имплементация KeyedProcessFunction выглядит так:
public class TriggerAlertProcessor<K, I, O> extends KeyedProcessFunction<K, I, O> {
@Override
public void open(Configuration parameters) {
// ...
}
@Override
public void processElement(O value, KeyedProcessFunction<I, O, K>.Context ctx, Collector<K> out) throws Exception {
// ...
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<I, O, K>.OnTimerContext ctx, Collector<K> out) throws Exception {
// ...
}
}
При обработке событий в методе processElement объект Context предоставляет доступ к элементу TimerService. Он умеет работать с метками времени: временем событий (для срабатывания триггера будет использован водяной знак Watermark) и реальным временем машины, где работает оператор. Через TimerService можно создать таймер, который запустится в определенное время в будущем.
KeyedProcessFunction также предоставляет метод onTimer() — именно он будет вызываться при срабатывании таймера. Во время этого вызова все состояния снова привязаны к ключу, с которым был создан таймер. Это позволяет таймерам управлять состоянием с ключом.
Важные замечания по поводу таймеров описаны в документации (перевод):
«Оба типа таймеров (время обработки и время события) обслуживаются внутри TimerService и помещаются в очередь для выполнения. TimerService дедуплицирует таймеры для каждого ключа и временной метки, т. е. существует не более одного таймера для каждого ключа и временной метки. Если для одной и той же временной метки зарегистрировано несколько таймеров, onTimer() метод будет вызываться только один раз. Flink синхронизирует вызовы onTimer() и processElement(). Следовательно, пользователям не нужно беспокоиться об одновременном изменении состояния».
В итоге более детальный план выполнения нашего оператора выглядит следующим образом:
Получаем TriggerMessage.
Если status события равен START, то с помощью TimerService создаем таймер длительностью десять минут, а также сохраняем состояние ValueState для текущего сообщения. На его основе мы в дальнейшем сможем создать AlertMessage.
Если status равен STOP, то удаляем все состояния для текущего ключа и созданный ранее таймер. Flink API позволяет удалить его, передав время срабатывания, поэтому нам дополнительно понадобится состояние для хранения этого времени.
onTimer() реализация должна формировать AlertMesage по сохраненному состоянию в пункте 2 и чистить все состояния по текущему ключу.
На основе этого создадим класс AlertState. Он будет отвечать за объекты, сохраненные в состоянии оператора создания уведомлений. Схематично этот класс будет использоваться в следующем преобразовании:
Такой промежуточный класс необходим для:
Разделения ответственности между форматом сообщений в Async API (Kafka) и нашим внутренним состоянием. Мы не отдаем наружу внутренний формат хранения данных нашего приложения и можем независимо его менять.
Уменьшения данных во внутреннем состоянии. Если бы мы хранили там целиком входящее событие, это бы очень сильно отразилось на быстродействии процессов десериализации и сериализации.
В итоге AlertState будет полностью повторять формат выходного сообщения AlertMessage для нашего простейшего примера джобы:
@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class AlertState {
@JsonPropertyDescription("User id")
private UUID userId;
@JsonPropertyDescription("Trigger name")
private String triggerName;
@JsonPropertyDescription("Timestamp")
private Long timestamp;
}
Теперь посмотрим на реализацию самого оператора уведомлений:
@Slf4j
public class TriggerAlertProcessor extends KeyedProcessFunction<String, TriggerMessage, AlertMessage> {
private static final long serialVersionUID = 1L;
private final Duration stateWaiting;
private ValueState<AlertState> alertState;
private ValueState<Long> timestampState;
private transient MessageTransformer<TriggerMessage, AlertState> messageToStateTransformer;
private transient MessageTransformer<AlertState, AlertMessage> stateToMessageTransformer;
public TriggerAlertProcessor(@NotNull Duration stateWaiting) {
this.stateWaiting = stateWaiting;
}
@Override
public void open(Configuration parameters) {
// ...
}
@Override
public void processElement(TriggerMessage message,
KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx,
Collector<AlertMessage> out) throws Exception {
// ...
}
@Override
public void onTimer(long timestamp,
KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx,
Collector<AlertMessage> out) throws Exception {
// ...
}
public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> {
// ...
}
}
Мы видим, что оператор наследует класс KeyedProcessFunction<K, I, O>. В нем:
K — формат ключа (String);
I — формат входных сообщений (TriggerMessage);
O — формат выходных событий (AlertMessage).
Далее нам нужны следующие поля класса:
stateWainting — длительность ожидания события STOP по нашему ТЗ. По умолчанию она составляет десять минут, и мы передадим это значение из настроек application.yml непосредственно в конструкторе оператора;
alertState — состояние KeyedState, которое хранит AlertState для каждого пришедшего события TriggerMessage со статусом START. В дальнейшем мы будем преобразовать его в AlertMessage;
timestampState — состояние KeyedState, хранящее время срабатывания таймера. Это необходимо для возможности его удаления по событию STOP, описанному выше;
messageToStateTransformer и stateToMessageTransformer — классы, преобразующие TriggerMessage в состояние AlertState и AlertState в AlertMessage. Их код легко понять: мы напрямую перекладываем соответствующие поля.
Теперь посмотрим на метод open():
@Override
public void open(Configuration parameters) {
messageToStateTransformer = new TriggerMessageToAlertStateTransformer();
stateToMessageTransformer = new AlertStateToAlertMessageTransformer();
final var defaultTtlConfig = StateTtlConfig
.newBuilder(Time.minutes(stateWaiting.toMillis() + 1000))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(NeverReturnExpired)
.cleanupFullSnapshot()
.build();
// alert state
final var alertDescriptor = new ValueStateDescriptor<>("alertState", AlertState.class);
alertDescriptor.enableTimeToLive(defaultTtlConfig);
alertState = getRuntimeContext().getState(alertDescriptor);
// timestamp state
final var timestampDescriptor = new ValueStateDescriptor<>("timestampState", Types.LONG);
timestampDescriptor.enableTimeToLive(defaultTtlConfig);
timestampState = getRuntimeContext().getState(timestampDescriptor);
}
Как я упоминал в предыдущих статьях, в основном он необходим для инициализации объекта после его десериализации в слоте TaskManager перед непосредственным началом обработки сообщений (инициализацией transient-полей, подготовкой обработки событий и так далее).
В нашей реализации мы инициализируем:
Трансформеры сообщений: TriggerMessage → AlertState и AlertState → AlertMessage.
Конфигурацию TTL, где время ttl немного больше, чем время ожидания в 10 минут.
Состояние для AlertState и Timestamp: для этого передаем названия и типы хранящихся данных. Обратите внимание на важный момент: для инициализации состояний мы используем указание классов напрямую, хотя есть возможность указать сериализатор. Об этом я подробнее расскажу в следующих статьях.
Теперь оператор настроен и готов к обработке событий. Она производится в методе processElement, куда поступают TriggerMessage, ссылка до контекста Flink, а также ссылка out для добавления в выходной поток событий из оператора.
@Override
public void processElement(TriggerMessage message,
KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx,
Collector<AlertMessage> out) throws Exception {
final var status = message.getStatus();
if (START.equals(status)) {
// create timer
final var state = messageToStateTransformer.transform(message);
alertState.update(state);
final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();
final var previousTimestamp = timestampState.value();
if (previousTimestamp != null) {
ctx.timerService().deleteProcessingTimeTimer(previousTimestamp);
}
ctx.timerService().registerProcessingTimeTimer(invokeTimerMillis);
timestampState.update(invokeTimerMillis);
} else if (STOP.equals(status)) {
// remove timer
final var invokeTimerMillis = timestampState.value();
if (invokeTimerMillis != null) {
ctx.timerService().deleteProcessingTimeTimer(invokeTimerMillis);
timestampState.clear();
}
alertState.clear();
} else {
log.debug("Unknown trigger status {} for key {}", status, ctx.getCurrentKey());
}
}
Взглянем на внутренности метода более подробно:
Определили статус события — START или STOP. Остальные типы к нам не придут: вспоминаем первый фильтр в нашем пайплайне. Текущий оператор знать об этом не должен, поэтому добавляем логирование.
В случае START мы трансформируем пришедший триггер в объект состояния AlertState и сохраняем его в это состояние, а далее создаем сам таймер:
final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();
Этой строкой берем текущее время машины, где выполняется задание Flink и прибавляем ему десять минут по бизнес-условию. В итоге получаем время срабатывания таймера. Если таймер для текущего ключа (trigger_name + user_id) уже есть, то удаляем его и просто создаем новый по вычисленному времени. Также запоминаем это время отдельно в состоянии timestampState.
В случае STOP проверяем, существует ли в состоянии хоть какое-то время. Если оно есть, то мы создавали таймер в START блоке. Удаляем его и чистим все состояния для текущего ключа события.
Цель метода processElement — создавать таймеры и чистить состояния по бизнес-сценариям. Заметьте, что мы нигде в этом методе не добавляли события в выходной поток с помощью ссылки out. В выходной поток мы добавим события только при срабатывании таймера. За это отвечает метод onTimer():
@Override
public void onTimer(long timestamp,
KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx,
Collector<AlertMessage> out) throws Exception {
final var alertStateValue = alertState.value();
if (alertStateValue != null) {
final var alertMessage = stateToMessageTransformer.transform(alertStateValue);
out.collect(alertMessage);
}
timestampState.clear();
alertState.clear();
}
На входе он получает timestamp срабатывания таймера, все ту же ссылку на Flink, контекст и ссылку out. Взглянем на его реализацию подробнее:
Сначала мы достаем состояние AlertState, которое было добавлено на этапе обработке события в processElement.
Затем трансформируем его в формат выходного события AlertMessage.
В конце очищаем все состояния, потому что они больше не нужны.
Собираем Flink Job
Чтобы гибко настраивать параметры джобы, вынесем длительность таймеров в настройки application.yml (с маппингом на класс TriggerToAlertJobProperties):
jobs:
trigger-to-alert-job:
enabled: true
state-waiting: 10m
Собрав все операторы вместе, получим такую имплементацию нашего класса FlinkJob:
@Component
@AllArgsConstructor
@ConditionalOnProperty("jobs.trigger-to-alert-job.enabled")
public class TriggerToAlertJob extends FlinkJob {
private final TriggerToAlertJobProperties properties;
private final SourceBinder<TriggerMessage> sourceBinder;
private final SinkProvider<AlertMessage> sinkProvider;
@Override
public void registerJob(StreamExecutionEnvironment env) {
sourceBinder.bindSource(env)
.filter(new TriggerMessageByStatusAndUserFilter())
.uid("filter_trigger_message_by_status_id").name("filter_trigger_message_by_status")
.keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
.process(new TriggerAlertProcessor(properties.getStateWaiting()))
.uid("trigger_alert_processor_id").name("trigger_alert_processor")
.sinkTo(sinkProvider.createSink()).uid("sink_alert_message_id").name("sink_alert_message");
}
}
В качестве полей класса стандартно используем настройки для этой джобы, поля для получения Kafka Source и генерации Kafka Sink. В самой джобе по нашему начальному пайплайну определяем фильтр, оператор с таймерами и sink. Конечно, не забываем указать удобочитаемые имена и уникальный id для корректного восстановления состояния из savepoint.
В следующих статьях мы проверим работоспособность всего этого тестами. Используем E2E-тесты для основного сценария, unit-тесты для TriggerToAlertJob без Kafka и для отдельных операторов вместе с бизнес-сценариями генерации таймеров. Начнем со стандартных тестов, а далее перейдем к достаточно сложным и неочевидным проблемам, которые могут возникнуть в подобных Flink-джобах.
kenoma
А могли бы использовать сагу и абстрагироваться от транспортного уровня.