Это моя финальная часть(ну пока что ;)) серии статей про Kafka Streams, прошлые статьи тут [ноль, один, два, три] Теперь давайте разработаем приложение, которое считывает:

События о пульсе из топика pulse-events.

{
	"timestamp": "2024-12-26T09:02:00.000Z"
}

События о температуре тела из топика body-temp-events.

{
	"timestamp": "2024-12-26T09:02:00.000Z",
	"temperature": 37.4,
	"unit": "С"
}

Приложение будет обрабатывать эти события, используя Kafka Streams. Мы будем ориентироваться на event time (время, содержащееся в самом событии), а не на время обработки или время загрузки сообщения в Kafka.


Понятия времени в Kafka Streams

В потоковой обработке обычно выделяют три вида времени:

  • Event Time (время события). Когда событие реально произошло (значение поля timestamp, проставленное датчиком или сервисом).

  • Ingestion Time (или Loading Time). Время, когда сообщение попадает в систему (Kafka), фиксируется брокером Kafka.

  • Processing Time (время обработки). Текущее системное время при обработке события приложением (JVM time).

Используя event time, чтобы анализировать события в том порядке, в каком они случились в реальном мире. Если временные метки неверные или отсутствуют, можно ориентироваться на время загрузки или обработки.


Подготовка окружения и структура проекта

Необходимые инструменты

  • Apache Kafka (запущенная локально или в Docker)

  • Gradle (мы используем Kotlin DSL — build.gradle.kts)

  • Java 21

Убедитесь, что Kafka запущена на localhost:9092.

Структура проекта

kafka-streams-medical-app

├─ src

│  ├─ main

│  │  ├─ java

│  │  │  └─ com.example.kafka

│  │  │     ├─ MedicalMonitorApp.java

│  │  │     ├─ PulseEvent.java

│  │  │     ├─ BodyTempEvent.java

│  │  │     ├─ MyTimestampExtractor.java

│  │  │     └─ ...

│  │  └─ resources

│  └─ test

│     └─ java

├─ build.gradle.kts

└─ settings.gradle.kts

Пример build.gradle.kts

Упрощённая версия сборочного файла (Kotlin DSL):

plugins {
	kotlin("jvm")
	application

}

repositories {
	mavenCentral()
}

dependencies {
	implementation("org.apache.kafka:kafka-streams:3.8.0")
	implementation("com.fasterxml.jackson.core:jackson-databind:2.15.0")
	implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.0")
	implementation("org.slf4j:slf4j-api:2.0.0")
	
	runtimeOnly("org.slf4j:slf4j-simple:2.0.0")
	
	testImplementation(kotlin("test"))
	testImplementation("org.apache.kafka:kafka-streams-test-utils:3.8.0")
}

application {
	mainClass.set("com.example.kafka.MedicalMonitorApp")
}

Запуск сборки: ./gradlew clean build


Создание и настройка топиков

Для работы нужно создать два топика:

  • pulse-events

  • body-temp-events

Пример команд (консоль Kafka):

bin/kafka-topics.sh --bootstrap-server localhost:9092 \\
--create --topic pulse-events \\
--partitions 1 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 \\
--create --topic body-temp-events \\
--partitions 1 --replication-factor 1

Создание классов-сущностей (DTO)

Пример класса PulseEvent.java:

public record PulseEvent(String timestamp) { }

Пример класса BodyTempEvent.java:

public record BodyTempEvent(String timestamp, double temperature, String unit) { }

Как различать время события, время загрузки и время обработки

Мы используем event time (из поля timestamp во входящем сообщении), чтобы анализировать события в реальном порядке их возникновения. Kafka по умолчанию может брать время загрузки (ingestion), но для медицинских показателей важно учитывать фактическое время измерения.

Если временные метки некорректны (или отсутствуют), можно использовать fallback на ingestion time или на processing time (системное время обработки).


Создание кастомного TimestampExtractor

Чтобы Kafka Streams знал, что мы ориентируемся на event time, нужно настроить TimestampExtractor, который будет забирать поле timestamp из сообщения:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
import com.fasterxml.jackson.databind.ObjectMapper;

public class MyTimestampExtractor implements TimestampExtractor {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
        if (record.value() == null) {
            return partitionTime;
        }

        try {
            String jsonString = record.value().toString();
            String timestampStr = objectMapper.readTree(jsonString).get("timestamp").asText();
            return javax.xml.bind.DatatypeConverter.parseDateTime(timestampStr).getTimeInMillis();
        } catch (Exception e) {
            return partitionTime;
        }
    }
}
  • Извлекаем JSON-поле "timestamp" из сообщения.

  • Преобразуем его из ISO-8601 в миллисекунды (long).

  • Если парсинг не удался, возвращаем partitionTime.


Как время управляет потоком данных в Kafka Streams

Благодаря кастомному TimestampExtractor, Kafka Streams выстраивает события по временной шкале на основе event time. Если приходят “запаздывающие” события со старыми временными метками, они могут попасть в уже закрытые окна (это регулируется настройками grace period).

Это особенно важно, когда реальное получение события задерживается, но мы хотим учитывать его именно в том временном интервале, к которому оно принадлежит фактически.


Типы окон в Kafka Streams

Tumbling Windows

  • Окна фиксированного размера, которые не перекрываются.

  • Например, окно в 1 минуту: [09:00 – 09:01], [09:01 – 09:02] и т.д.

Hopping Windows

  • Окна фиксированного размера, но перекрывающиеся.

  • Пример: размер окна 1 минута, сдвиг 30 секунд, получаем [09:00 – 09:01], [09:00:30 – 09:01:30] и т.д.

Session Windows

  • Окна, которые не имеют фиксированной длительности; они определяются “паузами” между событиями.

  • Если между событиями нет больших интервалов, окно расширяется; при длительном затишье окно “закрывается”.

Sliding Join Windows

  • Окна, используемые в операциях join потоков, где нам важно “захватить” события, которые произошли в промежутке [t, t+N].

  • Похоже на hopping windows, но применяется для Stream-Stream join.

Выполнение и производство оконных соединений (Stream-Stream join)

Чтобы объединить два потока на основе временных интервалов, используется оконное соединение (Stream-Stream Join). Например, хотим “склеить” данные о пульсе и температуре, поступившие в пределах одной минуты:

KStream<String, PulseEvent> pulseStream = builder
    .stream("pulse-events", Consumed.with(Serdes.String(), pulseSerde)
        .withTimestampExtractor(new MyTimestampExtractor()));

KStream<String, BodyTempEvent> tempStream = builder
    .stream("body-temp-events", Consumed.with(Serdes.String(), bodyTempSerde)
        .withTimestampExtractor(new MyTimestampExtractor()));

KStream<String, CombinedMeasurement> joinedStream = pulseStream.join(
    tempStream,
    (pulseValue, tempValue) -> {
        CombinedMeasurement cm = new CombinedMeasurement();
        cm.setTimestampPulse(pulseValue.getTimestamp());
        cm.setTimestampTemp(tempValue.getTimestamp());
        cm.setTemperature(tempValue.getTemperature());
        return cm;
    },
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)),
    StreamJoined.with(Serdes.String(), pulseSerde, bodyTempSerde)
);

Таким образом, если события по ключу совпадают и попадают в общий временной интервал, они будут объединены в единый результат.


Зачем нужен оператор suppress и как его использовать

В KTable и агрегирующих операциях Kafka Streams возникают промежуточные результаты (updates). Оператор suppress позволяет “задерживать” (подавлять) публикацию промежуточных результатов и выдавать только конечный (после закрытия окна).

KTable<Windowed<String>, Long> aggregatedTable = pulseStream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .count();

KTable<Windowed<String>, Long> suppressedTable = aggregatedTable
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
  • untilWindowCloses означает, что мы публикуем результат только один раз — когда окно окончательно закрыто.

  • Это избавляет от “шторма” постоянных обновлений.


Что вышло в итоге

Ниже упрощённый код MedicalMonitorApp.java, где:

  • Настраиваем конфигурацию Kafka Streams.

  • Создаём потоки для пульса и температуры.

  • Выполняем join по окну в 1 минуту.

  • Отправляем результат в новый топик.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;

public class MedicalMonitorApp {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "medical-monitor-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, PulseEvent> pulseStream = builder.stream(
            "pulse-events",
            Consumed.with(
                Serdes.String(),
                Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(PulseEvent.class))
            )
        );

        KStream<String, BodyTempEvent> tempStream = builder.stream(
            "body-temp-events",
            Consumed.with(
                Serdes.String(),
                Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(BodyTempEvent.class))
            )
        );

        KStream<String, String> joinedStream = pulseStream.join(
            tempStream,
            (pulseVal, tempVal) -> {
                return "Pulse timestamp: " + pulseVal.getTimestamp() +
                       ", Temp timestamp: " + tempVal.getTimestamp() +
                       ", Temp: " + tempVal.getTemperature();
            },
            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)),
            StreamJoined.with(
                Serdes.String(),
                Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(PulseEvent.class)),
                Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(BodyTempEvent.class))
            )
        );

        joinedStream.to("joined-medical-events", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        streams.start();
    }
}

В примере использованы JsonSerializer и JsonDeserializer (обёртки над Jackson), которые можно реализовать вручную или взять готовую реализацию.


Инструкции по запуску и тестированию приложения

Сборка: ./gradlew clean build

Запуск (через Gradle): ./gradlew run

Отправка тестовых сообщений:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pulse-events
> {"timestamp": "2024-12-26T09:02:00.000Z"}
> {"timestamp": "2024-12-26T09:02:30.000Z"}

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic body-temp-events
> {"timestamp": "2024-12-26T09:02:05.000Z", "temperature": 37.4, "unit": "C"}
> {"timestamp": "2024-12-26T09:02:25.000Z", "temperature": 37.1, "unit": "C"}

Проверка результата в топике joined-medical-events:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \\
--topic joined-medical-events --from-beginning

Ожидаемый вывод:

Pulse timestamp: 2024-12-26T09:02:00.000Z, Temp timestamp: 2024-12-26T09:02:05.000Z, Temp: 37.4
Pulse timestamp: 2024-12-26T09:02:30.000Z, Temp timestamp: 2024-12-26T09:02:25.000Z, Temp: 37.1

Стало понятнее, чем отличаются event time, ingestion time и processing time, и почему так важно использовать фактическое время возникновения события. Посмотрели, как настроить кастомный TimestampExtractor, чтобы корректно обрабатывать события даже при наличии задержек или неточностей во временных метках. Разобрались, как различные типы окон (tumbling, hopping, session и sliding join windows) позволяют эффективно группировать и анализировать данные по выбранным интервалам времени, и как оператор suppress помогает не публиковать промежуточные результаты до завершения окна. Наконец, мы шаг за шагом создали полноценное приложение на Kafka Streams, способное принимать, обрабатывать и объединять потоковые данные о жизненно важных показателях (пульсе и температуре тела), демонстрируя, как все эти инструменты работают вместе для решения реальных задач мониторинга.

Данный код можно расширять, добавляя новые метрики (давление, уровень кислорода в крови и т.д.) или внедряя логику оповещений при выходе показателей за допустимые пределы, в общем есть над чем еще фантазировать =)

Комментарии (2)


  1. mister_xen
    30.12.2024 18:10

    1) Я сейчас смотрю топик куда складываются окна ,и вижу что там меняются значение на каждое значение, ели ты например отсылаешь после этого уведомления и ожидаешь 1 нзачение финальное, то будешь сильно удивлен что перед финальным послались все изменения. В кафка стриме есть FINAL или что еще чтобы в output посылался только финальный результат?

    2) попробовал вариант чтобы окно закрывалось через несколько времени. И что я вижу что при потоке сообщений всё работает корретно, окно закрывается, НО если остановить поток новых сообщений, то ПОСЛЕДНЕЕ окно будет висеть и не flush'иться пока я еще 1 сообщение не закину. это что вообще за бред поведение ?


    1. temirlan100 Автор
      30.12.2024 18:10

      Точно не знаю как это делать в Kafka Streams, я лично такие вещи отлавливаю и уже ручками делаю нужную обработку удаляю окно