Kafka — это универсальный и мощный инструмент для построения конвейеров данных в реальном времени и событийно-ориентированных приложений. В этой статье мы разберемся, как интегрировать Kafka с экосистемой Spring Boot.

Ядро архитектуры Kafka
В архитектуре Kafka сообщения являются сердцем системы. Производители создают и отправляют сообщения в определенные темы, которые выступают в роли категорий. Эти темы делятся на разделы для обеспечения эффективной параллельной обработки.
Потребители подписываются на темы и получают сообщения из разделов. Каждый раздел одновременно закреплен только за одним потребителем, что обеспечивает распределение нагрузки. Потребители обрабатывают сообщения в зависимости от своих потребностей, будь то аналитика, хранение данных или другие приложения.

Такая архитектура позволяет Kafka эффективно обрабатывать огромные потоки данных, обеспечивая отказоустойчивость и масштабируемость. Это надежная основа для конвейеров данных в реальном времени, событийно-ориентированных приложений и т.д.
Теперь, когда мы разобрались с принципами работы Kafka, давайте погрузимся в код.
Настройка Kafka в Spring Boot: Реализация кода
Прежде чем приступить к работе, необходимо, чтобы сервер Kafka работал в вашей локальной среде. Если вы еще не настроили Kafka в своей системе, то за подробными инструкциями можно обратиться к руководству по быстрому запуску Kafka.
Нам необходимо добавить зависимость spring-kafka maven в pom.xml.
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>Конфигурация производителя
Чтобы начать создавать сообщения, мы сначала настраиваем production factory. Она служит руководством для формирования экземпляров Kafka Producer.
Далее мы используем шаблон KafkaTemplate, который оборачивается вокруг экземпляра Producer и предлагает простые методы для отправки сообщений в определенные темы Kafka.
Экземпляры Producer разработаны как потокобезопасные, поэтому использование одного экземпляра во всем контексте приложения может повысить производительность. Это также относится к экземплярам KafkaTemplate.
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}В приведенном выше фрагменте кода мы настраиваем производителя с помощью свойств ProducerConfig. Рассмотрим основные свойства:
- BOOTSTRAP_SERVERS_CONFIGВ: в этом свойстве указываются адреса брокеров Kafka, которые представляют собой список пар хост-порт, разделенных запятыми.
- KEY_SERIALIZER_CLASS_CONFIGandVALUE_SERIALIZER_CLASS_CONFIG: Эти свойства определяют, как будут сериализованы ключ и значение сообщения перед отправкой в Kafka. В данном примере мы используем StringSerializer для сериализации как ключа, так и значения.
Поэтому в данном случае в файле свойств должно присутствовать значение 'bootstrap-server'.
spring.kafka.bootstrap-servers=localhost:9092Все службы, используемые в данной статье, предполагают работу на порту по умолчанию.
Создание тем Kafka
Мы будем отправлять сообщение в тему. Поэтому перед отправкой сообщений необходимо создать тему.
@Configuration
public class KafkaTopic {
    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic-1").build();
    }
    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic-2").partitions(3).build();
    }
}Bean KafkaAdmin отвечает за создание новых тем в нашем брокере. В Spring Boot KafkaAdmin регистрируется автоматически.
Здесь мы создали topic-1 с 1 разделом (по умолчанию) и topic-2 с 3 разделами. TopicBuilder предоставляет различные методы для создания тем.
Отправка сообщений
В KafkaTemplate имеются различные методы для отправки сообщений в темы:
@Component
@Slf4j
public class KafkaSender {
    @Autowired
    private KafkaTemplateString, String> kafkaTemplate;
        public void sendMessage(String message, String topicName) {
        log.info("Sending : {}", message);
        log.info("--------------------------------");
        kafkaTemplate.send(topicName, message);
    }
}Для публикации сообщения нам достаточно вызвать метод send(), указав в качестве параметров сообщение и имя темы.
Настройка потребителя
KafkaMessageListenerContainerFactory получает все сообщения от всех тем в одном потоке. Для этого нам также необходимо настроить consumerFacotry.
@Configuration
@EnableKafka
public class KafkaConsumer {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public ConsumerFactoryString, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}Далее нам необходимо потреблять сообщения с помощью аннотации KafkaListener. Для этого мы используем аннотацию EnableKafka в конфигурации потребителя. Она указывает Spring на необходимость поиска аннотации KafkaListener в ваших beans и конфигурирования необходимой инфраструктуры для обработки сообщений Kafka.
@Component
@Slf4j
public class KafkaListenerExample {
    @KafkaListener(topics = "topic-1", groupId = "group1")
    void listener(String data) {
        log.info("Received message [{}] in group1", data);
    }GroupId — это строка, однозначно идентифицирующая группу потребительских процессов, к которой принадлежит данный потребитель. Мы можем указать несколько тем для прослушивания в рамках одной группы потребителей. Точно так же несколько методов могут прослушивать одну и ту же тему.
@KafkaListener(topics = "topic-1,topic-2", groupId = "group1")
void listener(@Payload String data,
              @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
              @Header(KafkaHeaders.OFFSET) int offset) {
    log.info("Received message [{}] from group1, partition-{} with offset-{}",
            data,
            partition,
            offset);
}Мы также можем получить некоторые полезные метаданные о потребляемом сообщении с помощью аннотации Header().
Потребление сообщений из определенного раздела с начальным смещением
В некоторых сценариях может потребоваться потреблять сообщения из определенного раздела темы Kafka, начиная с определенного смещения. Это может быть полезно, когда требуется повторная обработка определенных сообщений или тонкий контроль над тем, с чего начать их потребление.
@KafkaListener(
  groupId = "group2",
  topicPartitions = @TopicPartition(topic = "topic-2",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}))
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      log.info("Received Message [{}] from partition-{}",
               message,
               partition);
}Устанавливая значение initialOffset равным "0", мы даем Kafka указание начинать потребление сообщений с начала раздела. Если вы хотите просто указать раздел без initialOffset, просто напишите следующее:
@KafkaListener(groupId = "group2", topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "3" }))KafkaListner на уровне классов
Аннотация на уровне класса подходит для случаев, когда необходимо сгруппировать логику обработки связанных сообщений. Сообщения из этих тем будут распределяться по методам внутри класса в зависимости от их параметров.
@Component
@Slf4j
@KafkaListener(id = "class-level", topics = "multi-type")
class KafkaClassListener {
  @KafkaHandler
  void listenString(String message) {
    log.info("KafkaHandler [String] {}", message);
  }
  @KafkaHandler(isDefault = true)
  void listenDefault(Object object) {
    log.info("KafkaHandler [Default] {}", object);
  }
}Таким образом, мы можем сгруппировать методы, которые будут потреблять данные из определенных тем. Здесь мы можем перехватывать различные типы данных с помощью методов, аннотированных KafkaHandler. Параметры метода будут определять способ получения данных, и если ни один из типов данных не совпадает, то будет применен метод по умолчанию.
Теперь, когда мы рассмотрели основы работы производителей и слушателей со строковыми сообщениями, давайте изучим различные сценарии и варианты их использования.
Использование шаблона RoutingKafkaTemplate
Мы можем использовать RoutingKafkaTemplate, когда есть несколько производителей с различными конфигурациями и мы хотим выбрать производителя на основе имени темы во время выполнения.
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
    // ProducerFactory with Bytes serializer
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
    context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
    // ProducerFactory with String serializer
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);
    Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
    map.put(Pattern.compile(".*-bytes"), bytesPF);
    map.put(Pattern.compile("strings-.*"), stringPF);
    return new RoutingKafkaTemplate(map);
}RoutingKafkaTemplate направляет сообщения первому экземпляру фабрики, который соответствует заданному имени темы из карты шаблонов regex и экземпляров ProducerFactory. Шаблон strings-.* должен быть первым, если есть два шаблона, str-.* и strings-.*, так как в противном случае шаблон str-.* будет "перекрывать" его.
В приведенном примере мы создали два шаблона: .*-bytes и strings-.* Сериализация сообщений зависит от имени темы во время выполнения. Имена тем, заканчивающиеся на '-bytes', будут использовать байтовый сериализатор, а начинающиеся на strings-.* - StringSerializer.
Фильтрация сообщений
Все сообщения, удовлетворяющие критериям фильтра, будут отброшены еще до того, как они попадут к слушателю. Здесь отбрасываются сообщения, содержащие слово "ignored".
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(record -> record.value().contains("ignored"));
    return factory;
}Слушатель инкапсулируется в FilteringMessageListenerAdapter. Этот адаптер опирается на реализацию RecordFilterStrategy, в которой мы определяем метод фильтрации. Для вызова фильтра можно просто добавить одну строку в фабрику текущего потребителя.
Пользовательские сообщения
Теперь рассмотрим, как отправить или получить Java-объект. В нашем примере мы будем отправлять и получать объекты User.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
    String msg;
}Конфигурация производителя и потребителя
Для конфигурации значений производителя мы будем использовать JSON Serializer:
@Bean
public ProducerFactory<String, User> userProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
    return new KafkaTemplate<>(userProducerFactory());
}А для потребителей это будет JSON Deserializer:
public ConsumerFactory<String, User> userConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(userConsumerFactory());
    return factory;
}Сериализатор и десериализатор JSON в spring-kafka используют библиотеку Jackson, которая отвечает за преобразование Java-объектов в байты и наоборот.
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.7.1</version>
        </dependency>Это необязательная зависимость, и если вы хотите ее использовать, то используйте ту же версию, что и spring-kafka.
Отправка Java-объектов
Давайте отправим объект User с помощью созданного нами шаблона userKafkaTemplate().
@Component
@Slf4j
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String, User> userKafkaTemplate;
    void sendCustomMessage(User user, String topicName) {
        log.info("Sending Json Serializer : {}", user);
        log.info("--------------------------------");
        userKafkaTemplate.send(topicName, user);
    }Получение Java-объектов
@Component
@Slf4j
public class KafkaListenerExample {
    @KafkaListener(topics = "topic-3", groupId = "user-group",
            containerFactory = "userKafkaListenerContainerFactory")
    void listenerWithMessageConverter(User user) {
        log.info("Received message through MessageConverterUserListener [{}]", user);
    }Поскольку у нас несколько контейнеров-слушателей, мы указываем, какую фабрику контейнеров использовать.
Если мы не укажем атрибут containerFactory, то по умолчанию будет использоваться kafkaListenerContainerFactory, которая в нашем случае использует StringSerializer и StringDeserializer.
Заключение
В руководстве мы рассказали о настройке Kafka в приложении Spring Boot, и о том, как создавать и потреблять сообщения с помощью шаблонов и слушателей Kafka. Мы познакомились с примерами работы с различными типами сообщений, маршрутизацией сообщений, фильтрацией сообщений и преобразованием пользовательских форматов данных.
Если вы хотите глубже изучить Apache Kafka, приходите на интенсив Apache Kafka для разработчиков. В нём углублённая теория и практика на Java или Golang с платформой Spring+Docker+Postgres. Вы узнаете типовые шаблоны проектирования, сделаете своё приложение надёжнее, получите опыт разработки нескольких приложений, использующих Kafka.
???? Программа курса на нашем сайте
Курс поможет уменьшить время на рабочие задачи с Кафкой, добавить красивую строчку в резюме и взобраться на следующую ступень карьерной лестницы.
 
          