Kafka — это универсальный и мощный инструмент для построения конвейеров данных в реальном времени и событийно-ориентированных приложений. В этой статье мы разберемся, как интегрировать Kafka с экосистемой Spring Boot.

Ядро архитектуры Kafka

В архитектуре Kafka сообщения являются сердцем системы. Производители создают и отправляют сообщения в определенные темы, которые выступают в роли категорий. Эти темы делятся на разделы для обеспечения эффективной параллельной обработки.

Потребители подписываются на темы и получают сообщения из разделов. Каждый раздел одновременно закреплен только за одним потребителем, что обеспечивает распределение нагрузки. Потребители обрабатывают сообщения в зависимости от своих потребностей, будь то аналитика, хранение данных или другие приложения.

Архитектура Kafka
Архитектура Kafka

Такая архитектура позволяет Kafka эффективно обрабатывать огромные потоки данных, обеспечивая отказоустойчивость и масштабируемость. Это надежная основа для конвейеров данных в реальном времени, событийно-ориентированных приложений и т.д.

Теперь, когда мы разобрались с принципами работы Kafka, давайте погрузимся в код.

Настройка Kafka в Spring Boot: Реализация кода

Прежде чем приступить к работе, необходимо, чтобы сервер Kafka работал в вашей локальной среде. Если вы еще не настроили Kafka в своей системе, то за подробными инструкциями можно обратиться к руководству по быстрому запуску Kafka.

Нам необходимо добавить зависимость spring-kafka maven в pom.xml.

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

Конфигурация производителя

Чтобы начать создавать сообщения, мы сначала настраиваем production factory. Она служит руководством для формирования экземпляров Kafka Producer.

Далее мы используем шаблон KafkaTemplate, который оборачивается вокруг экземпляра Producer и предлагает простые методы для отправки сообщений в определенные темы Kafka.

Экземпляры Producer разработаны как потокобезопасные, поэтому использование одного экземпляра во всем контексте приложения может повысить производительность. Это также относится к экземплярам KafkaTemplate.

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

В приведенном выше фрагменте кода мы настраиваем производителя с помощью свойств ProducerConfig. Рассмотрим основные свойства:

  • BOOTSTRAP_SERVERS_CONFIGВ: в этом свойстве указываются адреса брокеров Kafka, которые представляют собой список пар хост-порт, разделенных запятыми.

  • KEY_SERIALIZER_CLASS_CONFIGandVALUE_SERIALIZER_CLASS_CONFIG: Эти свойства определяют, как будут сериализованы ключ и значение сообщения перед отправкой в Kafka. В данном примере мы используем StringSerializer для сериализации как ключа, так и значения.

Поэтому в данном случае в файле свойств должно присутствовать значение 'bootstrap-server'.

spring.kafka.bootstrap-servers=localhost:9092

Все службы, используемые в данной статье, предполагают работу на порту по умолчанию.

Создание тем Kafka

Мы будем отправлять сообщение в тему. Поэтому перед отправкой сообщений необходимо создать тему.

@Configuration
public class KafkaTopic {

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic-1").build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic-2").partitions(3).build();
    }
}

Bean KafkaAdmin отвечает за создание новых тем в нашем брокере. В Spring Boot KafkaAdmin регистрируется автоматически.

Здесь мы создали topic-1 с 1 разделом (по умолчанию) и topic-2 с 3 разделами. TopicBuilder предоставляет различные методы для создания тем.

Отправка сообщений

В KafkaTemplate имеются различные методы для отправки сообщений в темы:

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplateString, String> kafkaTemplate;

        public void sendMessage(String message, String topicName) {
        log.info("Sending : {}", message);
        log.info("--------------------------------");

        kafkaTemplate.send(topicName, message);
    }
}

Для публикации сообщения нам достаточно вызвать метод send(), указав в качестве параметров сообщение и имя темы.

Настройка потребителя

KafkaMessageListenerContainerFactory получает все сообщения от всех тем в одном потоке. Для этого нам также необходимо настроить consumerFacotry.

@Configuration
@EnableKafka
public class KafkaConsumer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactoryString, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Далее нам необходимо потреблять сообщения с помощью аннотации KafkaListener. Для этого мы используем аннотацию EnableKafka в конфигурации потребителя. Она указывает Spring на необходимость поиска аннотации KafkaListener в ваших beans и конфигурирования необходимой инфраструктуры для обработки сообщений Kafka.

@Component
@Slf4j
public class KafkaListenerExample {

    @KafkaListener(topics = "topic-1", groupId = "group1")
    void listener(String data) {
        log.info("Received message [{}] in group1", data);
    }

GroupId — это строка, однозначно идентифицирующая группу потребительских процессов, к которой принадлежит данный потребитель. Мы можем указать несколько тем для прослушивания в рамках одной группы потребителей. Точно так же несколько методов могут прослушивать одну и ту же тему.

@KafkaListener(topics = "topic-1,topic-2", groupId = "group1")
void listener(@Payload String data,
              @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
              @Header(KafkaHeaders.OFFSET) int offset) {
    log.info("Received message [{}] from group1, partition-{} with offset-{}",
            data,
            partition,
            offset);
}

Мы также можем получить некоторые полезные метаданные о потребляемом сообщении с помощью аннотации Header().

Потребление сообщений из определенного раздела с начальным смещением

В некоторых сценариях может потребоваться потреблять сообщения из определенного раздела темы Kafka, начиная с определенного смещения. Это может быть полезно, когда требуется повторная обработка определенных сообщений или тонкий контроль над тем, с чего начать их потребление.

@KafkaListener(
  groupId = "group2",
  topicPartitions = @TopicPartition(topic = "topic-2",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}))
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      log.info("Received Message [{}] from partition-{}",
               message,
               partition);
}

Устанавливая значение initialOffset равным "0", мы даем Kafka указание начинать потребление сообщений с начала раздела. Если вы хотите просто указать раздел без initialOffset, просто напишите следующее:

@KafkaListener(groupId = "group2", topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "3" }))

KafkaListner на уровне классов

Аннотация на уровне класса подходит для случаев, когда необходимо сгруппировать логику обработки связанных сообщений. Сообщения из этих тем будут распределяться по методам внутри класса в зависимости от их параметров.

@Component
@Slf4j
@KafkaListener(id = "class-level", topics = "multi-type")
class KafkaClassListener {

  @KafkaHandler
  void listenString(String message) {
    log.info("KafkaHandler [String] {}", message);
  }

  @KafkaHandler(isDefault = true)
  void listenDefault(Object object) {
    log.info("KafkaHandler [Default] {}", object);
  }
}

Таким образом, мы можем сгруппировать методы, которые будут потреблять данные из определенных тем. Здесь мы можем перехватывать различные типы данных с помощью методов, аннотированных KafkaHandler. Параметры метода будут определять способ получения данных, и если ни один из типов данных не совпадает, то будет применен метод по умолчанию.

Теперь, когда мы рассмотрели основы работы производителей и слушателей со строковыми сообщениями, давайте изучим различные сценарии и варианты их использования.

Использование шаблона RoutingKafkaTemplate

Мы можем использовать RoutingKafkaTemplate, когда есть несколько производителей с различными конфигурациями и мы хотим выбрать производителя на основе имени темы во время выполнения.

@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {

    // ProducerFactory with Bytes serializer
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
    context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

    // ProducerFactory with String serializer
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);

    Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
    map.put(Pattern.compile(".*-bytes"), bytesPF);
    map.put(Pattern.compile("strings-.*"), stringPF);
    return new RoutingKafkaTemplate(map);
}

RoutingKafkaTemplate направляет сообщения первому экземпляру фабрики, который соответствует заданному имени темы из карты шаблонов regex и экземпляров ProducerFactory. Шаблон strings-.* должен быть первым, если есть два шаблона, str-.* и strings-.*, так как в противном случае шаблон str-.* будет "перекрывать" его.

В приведенном примере мы создали два шаблона: .*-bytes и strings-.* Сериализация сообщений зависит от имени темы во время выполнения. Имена тем, заканчивающиеся на '-bytes', будут использовать байтовый сериализатор, а начинающиеся на strings-.* - StringSerializer.

Фильтрация сообщений

Все сообщения, удовлетворяющие критериям фильтра, будут отброшены еще до того, как они попадут к слушателю. Здесь отбрасываются сообщения, содержащие слово "ignored".

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(record -> record.value().contains("ignored"));
    return factory;
}

Слушатель инкапсулируется в FilteringMessageListenerAdapter. Этот адаптер опирается на реализацию RecordFilterStrategy, в которой мы определяем метод фильтрации. Для вызова фильтра можно просто добавить одну строку в фабрику текущего потребителя.

Пользовательские сообщения

Теперь рассмотрим, как отправить или получить Java-объект. В нашем примере мы будем отправлять и получать объекты User.

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

    String msg;
}

Конфигурация производителя и потребителя

Для конфигурации значений производителя мы будем использовать JSON Serializer:

@Bean
public ProducerFactory<String, User> userProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
    return new KafkaTemplate<>(userProducerFactory());
}

А для потребителей это будет JSON Deserializer:

public ConsumerFactory<String, User> userConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer>(User.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(userConsumerFactory());
    return factory;
}

Сериализатор и десериализатор JSON в spring-kafka используют библиотеку Jackson, которая отвечает за преобразование Java-объектов в байты и наоборот.

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.7.1</version>
        </dependency>

Это необязательная зависимость, и если вы хотите ее использовать, то используйте ту же версию, что и spring-kafka.

Отправка Java-объектов

Давайте отправим объект User с помощью созданного нами шаблона userKafkaTemplate().

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, User> userKafkaTemplate;


    void sendCustomMessage(User user, String topicName) {
        log.info("Sending Json Serializer : {}", user);
        log.info("--------------------------------");

        userKafkaTemplate.send(topicName, user);
    }

Получение Java-объектов

@Component
@Slf4j
public class KafkaListenerExample {


    @KafkaListener(topics = "topic-3", groupId = "user-group",
            containerFactory = "userKafkaListenerContainerFactory")
    void listenerWithMessageConverter(User user) {
        log.info("Received message through MessageConverterUserListener [{}]", user);
    }

Поскольку у нас несколько контейнеров-слушателей, мы указываем, какую фабрику контейнеров использовать.

Если мы не укажем атрибут containerFactory, то по умолчанию будет использоваться kafkaListenerContainerFactory, которая в нашем случае использует StringSerializer и StringDeserializer.

Заключение

В руководстве мы рассказали о настройке Kafka в приложении Spring Boot, и о том, как создавать и потреблять сообщения с помощью шаблонов и слушателей Kafka. Мы познакомились с примерами работы с различными типами сообщений, маршрутизацией сообщений, фильтрацией сообщений и преобразованием пользовательских форматов данных.


Если вы хотите глубже изучить Apache Kafka, приходите на интенсив Apache Kafka для разработчиков. В нём углублённая теория и практика на Java или Golang с платформой Spring+Docker+Postgres. Вы узнаете типовые шаблоны проектирования, сделаете своё приложение надёжнее, получите опыт разработки нескольких приложений, использующих Kafka.

???? Программа курса на нашем сайте

Курс поможет уменьшить время на рабочие задачи с Кафкой, добавить красивую строчку в резюме и взобраться на следующую ступень карьерной лестницы.

Комментарии (0)