Привет, сообщество Хабр =)

Начав изучать Kafka Streams, я заметил, что для решения различных задач приходится искать информацию по разным источникам, поэтому со временем накопилось много собственных конспектов. Хочу поделиться ими в виде серии туториалов на Хабре.

Несмотря на обилие ресурсов по Kafka Streams и отличные статьи на Хабре [ноль, один, два], мне не хватало пошаговых руководств, которые детально раскрывают изъяны и преимущества этой технологии. Поэтому решил создать такой материал, чтобы помочь другим разобраться структурно и последовательно.

? Не буду рассматривать, что такое Apache Kafka. Предполагается, что вы уже знакомы с данным брокером.

Я не сразу понял ценность данной библиотеки (Kafka Streams это часть Apache Kafka) как бы приложение все равно читает сообщение применяет набор действий к сообщению и записывает снова в тот или другой топик, или в совсем другой источник данных, в общем на первый взгляд будто бы никакой разницы, но с погружением в детали начинаешь понимать почему сделали этот инструмент и почему он удобен и востребован.

Без Kafka Streams

Установим нужный пакет

implementation("org.apache.kafka:kafka-clients:3.8.0")

При прямом использовании Apache Kafka нужно использовать KafkaConsumer для чтения сообщений из топика. Вот простой пример:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Collections;

public class SimpleKafkaConsumerProducer {
    public static void main(String[] args) {
        // Настройки для consumer
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "my-group");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Настройки для producer
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        var consumer = new KafkaConsumer<String, String>(consumerProps);
        var producer = new KafkaProducer<String, String>(producerProps);

        consumer.subscribe(Collections.singletonList("input-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);

                for (ConsumerRecord<String, String> record : records) {
                    // Обработка сообщения
                    String originalValue = record.value();
                    String processedValue = originalValue.toUpperCase();

                    // Отправка обработанного сообщения в другой топик
                    producer.send(new ProducerRecord<>("output-topic", record.key(), processedValue));

                    System.out.println("Обработанное сообщение: " + processedValue);
                }
            }
        } finally {
            consumer.close();
            producer.close();
        }
    }
}
  • Чтение сообщения: Используем KafkaConsumer для чтения сообщений из input-topic.

  • Обработка сообщения: В данном случае просто преобразуем текст сообщения в верхний регистр.

  • Запись сообщения: Используем KafkaProducer для отправки обработанного сообщения в output-topic.

С Kafka Streams

Установим нужный пакет

implementation("org.apache.kafka:kafka-streams:3.8.0")

Теперь тот же функционал с использованием Kafka Streams:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;

public class SimpleKafkaStreamsApp {
    public static void main(String[] args) {
        // Настройки для Kafka Streams
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // чтобы читать данные с начала топика в явном виде указываем начальное смещение
        props.put("consumer.auto.offset.reset", "earliest");

        // Строим топологию потоков
        StreamsBuilder builder = new StreamsBuilder();

        // Читаем данные из входного топика
        KStream<String, String> source = builder.stream("input-topic");

        // Обрабатываем данные (преобразуем в верхний регистр)
        KStream<String, String> processed = source.mapValues(value -> value.toUpperCase());

        // Отправляем обработанные данные в выходной топик
        processed.to("output-topic");

        // Создаем и запускаем Kafka Streams приложение
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Добавляем shutdown hook для корректного завершения работы
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
  • Чтение и обработка данных: Используем StreamsBuilder для построения топологии потоков. Метод mapValues применяется для преобразования значения сообщения.

  • Запись данных: Обработанные данные автоматически отправляются в output-topic с помощью метода to.

Без Kafka Streams:

  • Нужно самостоятельно управлять потребителем и производителем.

  • Больше кода для настройки и управления.

  • Менее декларативный подход.

С Kafka Streams:

  • Управление потоками данных абстрагировано библиотекой.

  • Код более декларативен и фокусируется на логике обработки.

  • Легче масштабировать и добавлять сложные преобразования.

Визуальная схема, того почему обработка данных с помощью Kafka Streams проще и эффективнее.

Kafka Streams
Kafka Streams

На схеме видно, что при прямом использовании Kafka нам приходится самостоятельно управлять потребителями и производителями, что усложняет код и архитектуру приложения. С Kafka Streams же мы работаем с более высоким уровнем абстракции, где многие детали управления потоками данных уже реализованы за нас. Это позволяет сосредоточиться на логике обработки данных, а не на инфраструктурных деталях.

Пока рано делать окончательные выводы — это лишь верхушка айсберга. Чем глубже мы погружаемся в Kafka Streams, тем более очевидной становится её ценность. Продолжение следует в следующей статье.

Комментарии (3)


  1. csl
    15.10.2024 12:37

    Будет ли в следующих частях:

    Как организовать ретраи или circuit-breaker? Что будет, если один из процессоров в стриме упадет с ошибкой?

    К примеру, можно ли хотя бы в отдельный топик направить все сообщения, которые вызвали сбои топологии?

    https://habr.com/ru/articles/747658/comments/#comment_25786932 спасибо @FruTbза наводку


    1. temirlan100 Автор
      15.10.2024 12:37

      Тут наверное не нужно искать решение в плоскости Kafka Streams, на мой взгляд

      Как организовать ретраи или circuit-breaker? Что будет, если один из процессоров в стриме упадет с ошибкой?

      Обработка ошибок уже зависит от бизнес потребностей, также там было упомянуто про akka, у меня тут к сожалению нет опыта, но стандартно обработать можно так

      KStream<String, String> stream = builder.stream("input-topic");
      
      stream.mapValues(value -> {
          int maxRetries = 3;
          int attempt = 0;
          while (attempt < maxRetries) {
              try {
                  // Ваша логика обработки
                  return process(value);
              } catch (Exception e) {
                  attempt++;
                  if (attempt == maxRetries) {
                      // Логирование и обработка после максимального количества попыток
                      // Например, отправка в Dead Letter Topic
                  }
              }
          }
          return null; // Или другое значение по умолчанию
      });

      но так же есть удобная библиотека https://github.com/resilience4j/resilience4j

      import io.github.resilience4j.retry.Retry;
      import io.github.resilience4j.retry.RetryConfig;
      
      import io.github.resilience4j.circuitbreaker.CircuitBreaker;
      import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;

      https://github.com/resilience4j/resilience4j/tree/master/resilience4j-circuitbreaker
      https://github.com/resilience4j/resilience4j/tree/master/resilience4j-retry

      К примеру, можно ли хотя бы в отдельный топик направить все сообщения, которые вызвали сбои топологии?

      Тут можно настроить обработчик исключений на уровне конфигурации Kafka Streams

      Properties props = new Properties();
      props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                LogAndContinueExceptionHandler.class);
      props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                DefaultProductionExceptionHandler.class);

      или обрабатывать прям в коде


    1. alexanderfedyukov
      15.10.2024 12:37

      Обеспечение отказоустойчивости (resiliense and fault tolerance) идет от самой кафки. Т.е. самостоятельно организовывать в коде ничего не надо - только настраивать. А вот с обработкой ошибок не так всё не то, чтобы хорошо - скорее неудобно: необходимо либо их полностью обрабатывать в пределах отдельного этапа обработки, либо обрабатывать и разделять (split) дальнейший поток обработки. Это является одной из особенностей стримовой обработки сообщений в целом.