В предыдущих статьях [ноль, один] мы рассмотрели основные концепции Kafka Streams и сравнили их со стандартными подходами обработки потоковых данных. В этой части мы сосредоточимся на stateless processing (обработке без сохранения состояния) и поймем как применять различные операции Kafka Streams для решения практических задач. Мы создадим приложение для обработки данных медицинской клиники.
Требования к ETL пайплайну:
Приложение должно обрабатывать поток JSON-сообщений из входного топика Kafka (patient-records), содержащих информацию о пациентах медицинской клиники. Цель состоит в том, чтобы применять различные stateless операции для трансформации данных и отправлять результаты в выходной топик (clinic-notifications-topic). Это позволит продемонстрировать, как использовать основные возможности Kafka Streams для обработки данных без сохранения состояния.
Требуемые операции:
Фильтрация: исключить записи пациентов младше 18 лет.
Изменение ключей: установить patientId в качестве нового ключа.
-
Добавление и удаление полей:
Добавить поле nextAppointmentDate, если followUpNeeded равно true.
Удалить поле assignedDoctor, если оно null или пустое.
-
Ветвление потоков:
Поток A: пациенты с установленным диагнозом.
Поток B: пациенты без диагноза.
-
Преобразование записей:
В Потоке A создать уведомления для врачей.
В Потоке B создать напоминания для пациентов.
Поочередное обогащение: добавить информацию об ответственном враче из локального справочника.
Слияние потоков: объединить потоки A и B.
Вывод данных: отправить обработанные записи в clinic-notifications-topic
Настройка проекта
I. Создание проекта
Создадим новый проект на Java с использованием Gradle и Kotlin DSL (build.gradle.kts
).
Структура проекта:
kafka-streams-stateless/
├── build.gradle.kts
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com.example.kafka/
│ │ │ ├── StreamsApp.java
│ │ │ ├── PatientRecord.java
│ │ │ ├── Notification.java
│ │ │ ├── Reminder.java
│ │ │ └── Doctor.java
│ │ └── resources/
│ └── test/
└── settings.gradle.kts
II. Файл build.gradle.kts
plugins {
java
application
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.apache.kafka:kafka-streams:3.8.0")
implementation("com.fasterxml.jackson.core:jackson-databind:2.17.2")
implementation("com.fasterxml.jackson.core:jackson-core:2.17.2")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.17.2")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2")
}
application {
mainClass.set("com.example.kafka.StreamsApp")
}
III. Конфигурация Kafka Streams
Создадим класс StreamsApp.java и настроим конфигурацию Kafka Streams.
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
public class StreamsApp {
private static final String APPLICATION_NAME = "stateless-processing-app";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static Properties getStreamsConfig() {
var config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
return config;
}
}
Описание входных данных
Входные данные представляют собой JSON-сообщения о пациентах медицинской клиники, поступающие из входного топика Kafka (patient-records). Каждое сообщение содержит информацию о пациенте, его записи на прием и медицинском состоянии.
// 1 Взрослый пациент с диагнозом “Грипп” и необходимостью повторного приема.
// Поле assignedDoctor пустое.
{
"recordId": "1",
"patientId": "P001",
"name": "Иван Иванов",
"age": 30,
"appointmentDate": "2023-10-15",
"diagnosis": "Грипп",
"followUpNeeded": true,
"assignedDoctor": ""
}
// 2 Взрослый пациент без диагноза и без необходимости повторного приема.
// Поле assignedDoctor равно null.
{
"recordId": "2",
"patientId": "P002",
"name": "Анна Петрова",
"age": 22,
"appointmentDate": "2023-10-16",
"diagnosis": "",
"followUpNeeded": false,
"assignedDoctor": null
}
// 3 Пациент младше 18 лет. Эта запись должна быть отфильтрована и
// не попадет в дальнейшую обработку.
{
"recordId": "3",
"patientId": "P003",
"name": "Сергей Сидоров",
"age": 16,
"appointmentDate": "2023-10-17",
"diagnosis": "Ангина",
"followUpNeeded": true,
"assignedDoctor": "Др. Алексей Смирнов"
}
// 4 Взрослый пациент с диагнозом “Мигрень”, но без необходимости
// повторного приема. Указан ответственный врач.
{
"recordId": "4",
"patientId": "P004",
"name": "Мария Кузнецова",
"age": 45,
"appointmentDate": "2023-10-18",
"diagnosis": "Мигрень",
"followUpNeeded": false,
"assignedDoctor": "Др. Елена Иванова"
}
Реализация Stateless Операций
I. Фильтрация записей
Отфильтруем записи пациентов младше 18 лет.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> sourceStream = builder.stream("patient-records");
KStream<String, String> filteredStream = sourceStream.filter((key, value) -> {
PatientRecord record = PatientRecord.fromJson(value);
return record.getAge() >= 18;
});
+---------------+ --> +-------------+ --> +----------------+
| Patient Data | | Age < 18 | | Filtered (<18) |
+---------------+ +-------------+ +----------------+
II. Изменение ключей записей
Изменим ключ записи на patientId.
KStream<String, String> rekeyedStream = filteredStream.selectKey((key, value) -> {
PatientRecord record = PatientRecord.fromJson(value);
return record.getPatientId();
});
+---------------+ --> +----------------+ --> +-------------------+
| Record Data | | Change Key to | | Key = patientId |
+---------------+ | patientId | +-------------------+
III. Добавление и удаление полей
Добавим поле nextAppointmentDate и удалим поле assignedDoctor, если оно null или пустое.
KStream<String, String> augmentedStream = rekeyedStream.mapValues(value -> {
PatientRecord record = PatientRecord.fromJson(value);
if (record.isFollowUpNeeded()) {
record.setNextAppointmentDate(record.getAppointmentDate().plusDays(30));
}
if (record.getAssignedDoctor() == null || record.getAssignedDoctor().isEmpty()) {
record.setAssignedDoctor(null);
}
return record.toJson();
});
+-------------------+ --> +----------------------------+ --> +---------------------+
| Record Data | | Add: nextAppointmentDate | | Field Added/Removed |
| | | Remove: assignedDoctor if | | nextAppointmentDate |
| | | null or empty | | assignedDoctor (if) |
+-------------------+ +----------------------------+ +---------------------+
IV. Ветвление потоков
Разделим поток на два на основе наличия диагноза.
KStream<String, String>[] branchedStreams = augmentedStream.branch(
(key, value) -> {
PatientRecord record = PatientRecord.fromJson(value);
return record.getDiagnosis() != null && !record.getDiagnosis().isEmpty();
},
(key, value) -> {
PatientRecord record = PatientRecord.fromJson(value);
return record.getDiagnosis() == null || record.getDiagnosis().isEmpty();
}
);
KStream<String, String> diagnosedStream = branchedStreams[0];
KStream<String, String> undiagnosedStream = branchedStreams[1];
+---------------+ --> +----------------------+ --> +-----------------+
| Patient Data | | Has Diagnosis? | | With Diagnosis |
+---------------+ +----------------------+ +----------------_-+
\\ \\
\\ --> +---------------------+
\\----------------------------> | Without Diagnosis |
+--------------------+
V. Преобразование записей в один или несколько выходов
В Потоке A создадим уведомления для врачей.
KStream<String, String> doctorNotifications = diagnosedStream.mapValues(value -> {
PatientRecord record = PatientRecord.fromJson(value);
Notification notification = createDoctorNotification(record);
return notification.toJson();
});
В Потоке B создадим напоминания для пациентов.
KStream<String, String> patientReminders = undiagnosedStream.mapValues(value -> {
PatientRecord record = PatientRecord.fromJson(value);
Reminder reminder = createPatientReminder(record);
return reminder.toJson();
});
+---------------+ --> +-----------+ --> +------------------+
| Patient Data | | Stream A | | Notifications for |
+---------------+ | | | Doctors |
| | +-------------------+
| |
| | +------------------+
| Stream B | ---> | Reminders for |
| | | Patients |
+-----------+ +------------------+
VI. Поочередное обогащение данных
Добавим информацию об ответственном враче из локального справочника.
KStream<String, String> enrichedDoctorNotifications = doctorNotifications.mapValues(value -> {
Notification notification = Notification.fromJson(value);
Doctor doctor = getAssignedDoctor(notification.getPatientId());
notification.setAssignedDoctor(doctor);
return notification.toJson();
});
VII. Слияние потоков
Объединим оба потока обратно в один.
KStream<String, String> mergedStream = enrichedDoctorNotifications.merge(patientReminders);
+------------------+ +----------+ +------------------+
| Notifications | -----> | Merge | -----> | Combined Output |
| for Doctors | | | | |
+------------------+ | | +------------------+
| |
+------------------+ | |
| Reminders for | -----> | |
| Patients | +----------+
+------------------+
VIII. Вывод данных
Отправим обработанные записи в выходной топик clinic-notifications-topic.
mergedStream.to("clinic-notifications-topic");
Полный исходный код нашего приложения
StreamsApp класс
Doctor класс
PatientRecord класс
Notification класс
Reminder класс
Запуск и тестирование приложения
Убедитесь, что Kafka запущена локально.
-
Создайте входной и выходной топики.
bin/kafka-topics.sh --create --topic patient-records --bootstrap-server localhost:9092 bin/kafka-topics.sh --create --topic clinic-notifications-topic --bootstrap-server localhost:9092
Соберите и запустите приложение
./gradlew run
Отправка тестовых сообщений в топик patient-records
-
Просмотрите выходные сообщения в топике clinic-notifications-topic
{ "patientId": "P001", "message": "Проверьте план лечения пациента Иван Иванов.", "assignedDoctor": { "doctorId": "D001", "name": "Др. Сергей Петров" }, "nextAppointmentDate": "2023-11-14" }
Заключение
В этой статье мы подробно рассмотрели, как использовать stateless операции в Kafka Streams для обработки данных медицинской клиники. Мы включили техническое задание, описали структуру входных данных и поэтапно реализовали необходимые операции: фильтрацию, изменение ключей, добавление и удаление полей, ветвление и слияние потоков, преобразование записей и поочередное обогащение данных.
Использование stateless операций позволяет создавать эффективные и масштабируемые приложения для потоковой обработки данных без необходимости управления состоянием. Такие приложения проще в разработке и обслуживании, а также легко масштабируются горизонтально.
ElectricPigeon
Спасибо за статью! На мой взгляд, было бы лучше, если бы сразу использовалась автоматическая десериализация из JSON вместо затратного преобразования из строки в каждой функции‑обработчике.
С сериализацией результирующего стрима чуть сложнее — можно использовать sealed class из Java 17. Не знаю точно, как в Java, но в Kotlin у меня это получилось без проблем, потому что это поддерживается
kotlinx.serialization
за счёт того, что в итоговый JSON дописывается полеtype
. Для этого мне пришлось дописатьmapValues
в обоих стримах после разделения, чтобы конвертировать объекты к базовому классу.Ещё было бы полезно упомянуть про
streams.setUncaughtExceptionHandler
, потому что пока я о нём не узнал, моя программа завершалась молча в случаях, когда я указывал не тот класс для (де‑)сериализации или когда я опечатался в теле JSON.