Привет, сообщество Хабр =)
Начав изучать Kafka Streams, я заметил, что для решения различных задач приходится искать информацию по разным источникам, поэтому со временем накопилось много собственных конспектов. Хочу поделиться ими в виде серии туториалов на Хабре.
Несмотря на обилие ресурсов по Kafka Streams и отличные статьи на Хабре [ноль, один, два], мне не хватало пошаговых руководств, которые детально раскрывают изъяны и преимущества этой технологии. Поэтому решил создать такой материал, чтобы помочь другим разобраться структурно и последовательно.
? Не буду рассматривать, что такое Apache Kafka. Предполагается, что вы уже знакомы с данным брокером.
Я не сразу понял ценность данной библиотеки (Kafka Streams это часть Apache Kafka) как бы приложение все равно читает сообщение применяет набор действий к сообщению и записывает снова в тот или другой топик, или в совсем другой источник данных, в общем на первый взгляд будто бы никакой разницы, но с погружением в детали начинаешь понимать почему сделали этот инструмент и почему он удобен и востребован.
Без Kafka Streams
Установим нужный пакет
implementation("org.apache.kafka:kafka-clients:3.8.0")
При прямом использовании Apache Kafka нужно использовать KafkaConsumer
для чтения сообщений из топика. Вот простой пример:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Collections;
public class SimpleKafkaConsumerProducer {
public static void main(String[] args) {
// Настройки для consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Настройки для producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
var consumer = new KafkaConsumer<String, String>(consumerProps);
var producer = new KafkaProducer<String, String>(producerProps);
consumer.subscribe(Collections.singletonList("input-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// Обработка сообщения
String originalValue = record.value();
String processedValue = originalValue.toUpperCase();
// Отправка обработанного сообщения в другой топик
producer.send(new ProducerRecord<>("output-topic", record.key(), processedValue));
System.out.println("Обработанное сообщение: " + processedValue);
}
}
} finally {
consumer.close();
producer.close();
}
}
}
Чтение сообщения: Используем
KafkaConsumer
для чтения сообщений изinput-topic
.Обработка сообщения: В данном случае просто преобразуем текст сообщения в верхний регистр.
Запись сообщения: Используем
KafkaProducer
для отправки обработанного сообщения вoutput-topic
.
С Kafka Streams
Установим нужный пакет
implementation("org.apache.kafka:kafka-streams:3.8.0")
Теперь тот же функционал с использованием Kafka Streams:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class SimpleKafkaStreamsApp {
public static void main(String[] args) {
// Настройки для Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// чтобы читать данные с начала топика в явном виде указываем начальное смещение
props.put("consumer.auto.offset.reset", "earliest");
// Строим топологию потоков
StreamsBuilder builder = new StreamsBuilder();
// Читаем данные из входного топика
KStream<String, String> source = builder.stream("input-topic");
// Обрабатываем данные (преобразуем в верхний регистр)
KStream<String, String> processed = source.mapValues(value -> value.toUpperCase());
// Отправляем обработанные данные в выходной топик
processed.to("output-topic");
// Создаем и запускаем Kafka Streams приложение
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Добавляем shutdown hook для корректного завершения работы
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Чтение и обработка данных: Используем
StreamsBuilder
для построения топологии потоков. МетодmapValues
применяется для преобразования значения сообщения.Запись данных: Обработанные данные автоматически отправляются в
output-topic
с помощью методаto
.
Без Kafka Streams:
Нужно самостоятельно управлять потребителем и производителем.
Больше кода для настройки и управления.
Менее декларативный подход.
С Kafka Streams:
Управление потоками данных абстрагировано библиотекой.
Код более декларативен и фокусируется на логике обработки.
Легче масштабировать и добавлять сложные преобразования.
Визуальная схема, того почему обработка данных с помощью Kafka Streams проще и эффективнее.
На схеме видно, что при прямом использовании Kafka нам приходится самостоятельно управлять потребителями и производителями, что усложняет код и архитектуру приложения. С Kafka Streams же мы работаем с более высоким уровнем абстракции, где многие детали управления потоками данных уже реализованы за нас. Это позволяет сосредоточиться на логике обработки данных, а не на инфраструктурных деталях.
Пока рано делать окончательные выводы — это лишь верхушка айсберга. Чем глубже мы погружаемся в Kafka Streams, тем более очевидной становится её ценность. Продолжение следует в следующей статье.
csl
Будет ли в следующих частях:
Как организовать ретраи или circuit-breaker? Что будет, если один из процессоров в стриме упадет с ошибкой?
К примеру, можно ли хотя бы в отдельный топик направить все сообщения, которые вызвали сбои топологии?
https://habr.com/ru/articles/747658/comments/#comment_25786932 спасибо @FruTbза наводку
temirlan100 Автор
Тут наверное не нужно искать решение в плоскости Kafka Streams, на мой взгляд
Обработка ошибок уже зависит от бизнес потребностей, также там было упомянуто про akka, у меня тут к сожалению нет опыта, но стандартно обработать можно так
но так же есть удобная библиотека https://github.com/resilience4j/resilience4j
https://github.com/resilience4j/resilience4j/tree/master/resilience4j-circuitbreaker
https://github.com/resilience4j/resilience4j/tree/master/resilience4j-retry
Тут можно настроить обработчик исключений на уровне конфигурации Kafka Streams
или обрабатывать прям в коде
alexanderfedyukov
Обеспечение отказоустойчивости (resiliense and fault tolerance) идет от самой кафки. Т.е. самостоятельно организовывать в коде ничего не надо - только настраивать. А вот с обработкой ошибок не так всё не то, чтобы хорошо - скорее неудобно: необходимо либо их полностью обрабатывать в пределах отдельного этапа обработки, либо обрабатывать и разделять (split) дальнейший поток обработки. Это является одной из особенностей стримовой обработки сообщений в целом.