В предыдущих статьях [ноль, один] мы рассмотрели основные концепции Kafka Streams и сравнили их со стандартными подходами обработки потоковых данных. В этой части мы сосредоточимся на stateless processing (обработке без сохранения состояния) и поймем как применять различные операции Kafka Streams для решения практических задач. Мы создадим приложение для обработки данных медицинской клиники.

Требования к ETL пайплайну:

Приложение должно обрабатывать поток JSON-сообщений из входного топика Kafka (patient-records), содержащих информацию о пациентах медицинской клиники. Цель состоит в том, чтобы применять различные stateless операции для трансформации данных и отправлять результаты в выходной топик (clinic-notifications-topic). Это позволит продемонстрировать, как использовать основные возможности Kafka Streams для обработки данных без сохранения состояния.

Требуемые операции:

  • Фильтрация: исключить записи пациентов младше 18 лет.

  • Изменение ключей: установить patientId в качестве нового ключа.

  • Добавление и удаление полей:

    • Добавить поле nextAppointmentDate, если followUpNeeded равно true.

    • Удалить поле assignedDoctor, если оно null или пустое.

  • Ветвление потоков:

    • Поток A: пациенты с установленным диагнозом.

    • Поток B: пациенты без диагноза.

  • Преобразование записей:

    • В Потоке A создать уведомления для врачей.

    • В Потоке B создать напоминания для пациентов.

  • Поочередное обогащение: добавить информацию об ответственном враче из локального справочника.

  • Слияние потоков: объединить потоки A и B.

  • Вывод данных: отправить обработанные записи в clinic-notifications-topic

Настройка проекта

I. Создание проекта

Создадим новый проект на Java с использованием Gradle и Kotlin DSL (build.gradle.kts).

Структура проекта:

kafka-streams-stateless/
├── build.gradle.kts
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com.example.kafka/
│   │   │       ├── StreamsApp.java
│   │   │       ├── PatientRecord.java
│   │   │       ├── Notification.java
│   │   │       ├── Reminder.java
│   │   │       └── Doctor.java
│   │   └── resources/
│   └── test/
└── settings.gradle.kts

II. Файл build.gradle.kts

plugins {
    java
    application
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.apache.kafka:kafka-streams:3.8.0")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.17.2")
    implementation("com.fasterxml.jackson.core:jackson-core:2.17.2")
    implementation("com.fasterxml.jackson.core:jackson-annotations:2.17.2")
    implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2")
}

application {
    mainClass.set("com.example.kafka.StreamsApp")
}

III. Конфигурация Kafka Streams

Создадим класс StreamsApp.java и настроим конфигурацию Kafka Streams.

import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

public class StreamsApp {
    private static final String APPLICATION_NAME = "stateless-processing-app";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static Properties getStreamsConfig() {
        var config = new Properties();
        
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        
        return config;
    }
}

Описание входных данных

Входные данные представляют собой JSON-сообщения о пациентах медицинской клиники, поступающие из входного топика Kafka (patient-records). Каждое сообщение содержит информацию о пациенте, его записи на прием и медицинском состоянии.

// 1 Взрослый пациент с диагнозом “Грипп” и необходимостью повторного приема. 
// Поле assignedDoctor пустое.
{
  "recordId": "1",
  "patientId": "P001",
  "name": "Иван Иванов",
  "age": 30,
  "appointmentDate": "2023-10-15",
  "diagnosis": "Грипп",
  "followUpNeeded": true,
  "assignedDoctor": ""
}
// 2 Взрослый пациент без диагноза и без необходимости повторного приема. 
// Поле assignedDoctor равно null.
{
  "recordId": "2",
  "patientId": "P002",
  "name": "Анна Петрова",
  "age": 22,
  "appointmentDate": "2023-10-16",
  "diagnosis": "",
  "followUpNeeded": false,
  "assignedDoctor": null
}
// 3 Пациент младше 18 лет. Эта запись должна быть отфильтрована и 
// не попадет в дальнейшую обработку.
{
  "recordId": "3",
  "patientId": "P003",
  "name": "Сергей Сидоров",
  "age": 16,
  "appointmentDate": "2023-10-17",
  "diagnosis": "Ангина",
  "followUpNeeded": true,
  "assignedDoctor": "Др. Алексей Смирнов"
}
// 4 Взрослый пациент с диагнозом “Мигрень”, но без необходимости 
// повторного приема. Указан ответственный врач.
{
  "recordId": "4",
  "patientId": "P004",
  "name": "Мария Кузнецова",
  "age": 45,
  "appointmentDate": "2023-10-18",
  "diagnosis": "Мигрень",
  "followUpNeeded": false,
  "assignedDoctor": "Др. Елена Иванова"
}

Реализация Stateless Операций

I. Фильтрация записей

Отфильтруем записи пациентов младше 18 лет.

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> sourceStream = builder.stream("patient-records");

KStream<String, String> filteredStream = sourceStream.filter((key, value) -> {
		PatientRecord record = PatientRecord.fromJson(value);
    return record.getAge() >= 18;
});
+---------------+  -->  +-------------+  -->  +----------------+
| Patient Data  |       | Age < 18    |       | Filtered (<18) |
+---------------+       +-------------+       +----------------+

II. Изменение ключей записей

Изменим ключ записи на patientId.

KStream<String, String> rekeyedStream = filteredStream.selectKey((key, value) -> {
    PatientRecord record = PatientRecord.fromJson(value);
    return record.getPatientId();
});
+---------------+  -->  +----------------+  -->  +-------------------+
| Record Data   |       | Change Key to  |       | Key = patientId   |
+---------------+       | patientId      |       +-------------------+

III. Добавление и удаление полей

Добавим поле nextAppointmentDate и удалим поле assignedDoctor, если оно null или пустое.

  KStream<String, String> augmentedStream = rekeyedStream.mapValues(value -> {
    PatientRecord record = PatientRecord.fromJson(value);

    if (record.isFollowUpNeeded()) {
        record.setNextAppointmentDate(record.getAppointmentDate().plusDays(30));
    }

    if (record.getAssignedDoctor() == null || record.getAssignedDoctor().isEmpty()) {
        record.setAssignedDoctor(null);
    }

    return record.toJson();
});
+-------------------+  -->  +----------------------------+  -->  +---------------------+
| Record Data       |       | Add: nextAppointmentDate   |       | Field Added/Removed |
|                   |       | Remove: assignedDoctor if  |       | nextAppointmentDate |
|                   |       | null or empty              |       | assignedDoctor (if) |
+-------------------+       +----------------------------+       +---------------------+

IV. Ветвление потоков

Разделим поток на два на основе наличия диагноза.

KStream<String, String>[] branchedStreams = augmentedStream.branch(
    (key, value) -> {
        PatientRecord record = PatientRecord.fromJson(value);
        return record.getDiagnosis() != null && !record.getDiagnosis().isEmpty();
    },
    (key, value) -> {
        PatientRecord record = PatientRecord.fromJson(value);
        return record.getDiagnosis() == null || record.getDiagnosis().isEmpty();
    }
);

KStream<String, String> diagnosedStream = branchedStreams[0];
KStream<String, String> undiagnosedStream = branchedStreams[1];
+---------------+  -->  +----------------------+  -->  +-----------------+
| Patient Data  |       | Has Diagnosis?       |      | With Diagnosis   |
+---------------+       +----------------------+      +----------------_-+
                       \\                         \\
                        \\                         -->  +---------------------+
                         \\----------------------------> | Without Diagnosis  |
                                                        +--------------------+

V. Преобразование записей в один или несколько выходов

В Потоке A создадим уведомления для врачей.

KStream<String, String> doctorNotifications = diagnosedStream.mapValues(value -> {
    PatientRecord record = PatientRecord.fromJson(value);
    Notification notification = createDoctorNotification(record);
    return notification.toJson();
});

В Потоке B создадим напоминания для пациентов.

KStream<String, String> patientReminders = undiagnosedStream.mapValues(value -> {
    PatientRecord record = PatientRecord.fromJson(value);
    Reminder reminder = createPatientReminder(record);
    return reminder.toJson();
});
+---------------+  -->  +-----------+  -->  +------------------+
| Patient Data  |       | Stream A  |      | Notifications for |
+---------------+       |           |      | Doctors           |
                        |           |      +-------------------+
                        |           |
                        |           |      +------------------+
                        | Stream B  | ---> | Reminders for    |
                        |           |      | Patients         |
                        +-----------+      +------------------+

VI. Поочередное обогащение данных

Добавим информацию об ответственном враче из локального справочника.

KStream<String, String> enrichedDoctorNotifications = doctorNotifications.mapValues(value -> {
    Notification notification = Notification.fromJson(value);
    Doctor doctor = getAssignedDoctor(notification.getPatientId());
    notification.setAssignedDoctor(doctor);
    return notification.toJson();
});

VII. Слияние потоков

Объединим оба потока обратно в один.

KStream<String, String> mergedStream = enrichedDoctorNotifications.merge(patientReminders);
+------------------+        +----------+        +------------------+
| Notifications    | -----> |  Merge   | -----> |  Combined Output  |
| for Doctors      |        |          |        |                  |
+------------------+        |          |        +------------------+
                            |          |
+------------------+        |          |
| Reminders for    | -----> |          |
| Patients         |        +----------+
+------------------+

VIII. Вывод данных

Отправим обработанные записи в выходной топик clinic-notifications-topic.

mergedStream.to("clinic-notifications-topic");

Полный исходный код нашего приложения

Запуск и тестирование приложения

  • Убедитесь, что Kafka запущена локально.

  • Создайте входной и выходной топики.

    bin/kafka-topics.sh --create --topic patient-records --bootstrap-server localhost:9092
    bin/kafka-topics.sh --create --topic clinic-notifications-topic --bootstrap-server localhost:9092
  • Соберите и запустите приложение ./gradlew run

  • Отправка тестовых сообщений в топик patient-records

  • Просмотрите выходные сообщения в топике clinic-notifications-topic

    {
    	"patientId": "P001",
    	"message": "Проверьте план лечения пациента Иван Иванов.",
    	"assignedDoctor": {
    		"doctorId": "D001",
    		"name": "Др. Сергей Петров"
    	},
    	"nextAppointmentDate": "2023-11-14"
    }

Заключение

В этой статье мы подробно рассмотрели, как использовать stateless операции в Kafka Streams для обработки данных медицинской клиники. Мы включили техническое задание, описали структуру входных данных и поэтапно реализовали необходимые операции: фильтрацию, изменение ключей, добавление и удаление полей, ветвление и слияние потоков, преобразование записей и поочередное обогащение данных.

Использование stateless операций позволяет создавать эффективные и масштабируемые приложения для потоковой обработки данных без необходимости управления состоянием. Такие приложения проще в разработке и обслуживании, а также легко масштабируются горизонтально.

Комментарии (1)


  1. ElectricPigeon
    15.11.2024 13:19

    Спасибо за статью! На мой взгляд, было бы лучше, если бы сразу использовалась автоматическая десериализация из JSON вместо затратного преобразования из строки в каждой функции‑обработчике.

    С сериализацией результирующего стрима чуть сложнее — можно использовать sealed class из Java 17. Не знаю точно, как в Java, но в Kotlin у меня это получилось без проблем, потому что это поддерживается kotlinx.serialization за счёт того, что в итоговый JSON дописывается поле type. Для этого мне пришлось дописать mapValues в обоих стримах после разделения, чтобы конвертировать объекты к базовому классу.

    Ещё было бы полезно упомянуть про streams.setUncaughtExceptionHandler, потому что пока я о нём не узнал, моя программа завершалась молча в случаях, когда я указывал не тот класс для (де‑)сериализации или когда я опечатался в теле JSON.