Kafka — это универсальный и мощный инструмент для построения конвейеров данных в реальном времени и событийно-ориентированных приложений. В этой статье мы разберемся, как интегрировать Kafka с экосистемой Spring Boot.
Ядро архитектуры Kafka
В архитектуре Kafka сообщения являются сердцем системы. Производители создают и отправляют сообщения в определенные темы, которые выступают в роли категорий. Эти темы делятся на разделы для обеспечения эффективной параллельной обработки.
Потребители подписываются на темы и получают сообщения из разделов. Каждый раздел одновременно закреплен только за одним потребителем, что обеспечивает распределение нагрузки. Потребители обрабатывают сообщения в зависимости от своих потребностей, будь то аналитика, хранение данных или другие приложения.
Такая архитектура позволяет Kafka эффективно обрабатывать огромные потоки данных, обеспечивая отказоустойчивость и масштабируемость. Это надежная основа для конвейеров данных в реальном времени, событийно-ориентированных приложений и т.д.
Теперь, когда мы разобрались с принципами работы Kafka, давайте погрузимся в код.
Настройка Kafka в Spring Boot: Реализация кода
Прежде чем приступить к работе, необходимо, чтобы сервер Kafka работал в вашей локальной среде. Если вы еще не настроили Kafka в своей системе, то за подробными инструкциями можно обратиться к руководству по быстрому запуску Kafka.
Нам необходимо добавить зависимость spring-kafka maven в pom.xml
.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Конфигурация производителя
Чтобы начать создавать сообщения, мы сначала настраиваем production factory. Она служит руководством для формирования экземпляров Kafka Producer.
Далее мы используем шаблон KafkaTemplate, который оборачивается вокруг экземпляра Producer и предлагает простые методы для отправки сообщений в определенные темы Kafka.
Экземпляры Producer разработаны как потокобезопасные, поэтому использование одного экземпляра во всем контексте приложения может повысить производительность. Это также относится к экземплярам KafkaTemplate.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
В приведенном выше фрагменте кода мы настраиваем производителя с помощью свойств ProducerConfig. Рассмотрим основные свойства:
BOOTSTRAP_SERVERS_CONFIGВ
: в этом свойстве указываются адреса брокеров Kafka, которые представляют собой список пар хост-порт, разделенных запятыми.KEY_SERIALIZER_CLASS_CONFIGandVALUE_SERIALIZER_CLASS_CONFIG
: Эти свойства определяют, как будут сериализованы ключ и значение сообщения перед отправкой в Kafka. В данном примере мы используем StringSerializer для сериализации как ключа, так и значения.
Поэтому в данном случае в файле свойств должно присутствовать значение 'bootstrap-server'.
spring.kafka.bootstrap-servers=localhost:9092
Все службы, используемые в данной статье, предполагают работу на порту по умолчанию.
Создание тем Kafka
Мы будем отправлять сообщение в тему. Поэтому перед отправкой сообщений необходимо создать тему.
@Configuration
public class KafkaTopic {
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic-1").build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic-2").partitions(3).build();
}
}
Bean KafkaAdmin
отвечает за создание новых тем в нашем брокере. В Spring Boot KafkaAdmin
регистрируется автоматически.
Здесь мы создали topic-1 с 1 разделом (по умолчанию) и topic-2 с 3 разделами. TopicBuilder предоставляет различные методы для создания тем.
Отправка сообщений
В KafkaTemplate
имеются различные методы для отправки сообщений в темы:
@Component
@Slf4j
public class KafkaSender {
@Autowired
private KafkaTemplateString, String> kafkaTemplate;
public void sendMessage(String message, String topicName) {
log.info("Sending : {}", message);
log.info("--------------------------------");
kafkaTemplate.send(topicName, message);
}
}
Для публикации сообщения нам достаточно вызвать метод send(), указав в качестве параметров сообщение и имя темы.
Настройка потребителя
KafkaMessageListenerContainerFactory
получает все сообщения от всех тем в одном потоке. Для этого нам также необходимо настроить consumerFacotry
.
@Configuration
@EnableKafka
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactoryString, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}
Далее нам необходимо потреблять сообщения с помощью аннотации KafkaListener
. Для этого мы используем аннотацию EnableKafka
в конфигурации потребителя. Она указывает Spring на необходимость поиска аннотации KafkaListener
в ваших beans и конфигурирования необходимой инфраструктуры для обработки сообщений Kafka.
@Component
@Slf4j
public class KafkaListenerExample {
@KafkaListener(topics = "topic-1", groupId = "group1")
void listener(String data) {
log.info("Received message [{}] in group1", data);
}
GroupId — это строка, однозначно идентифицирующая группу потребительских процессов, к которой принадлежит данный потребитель. Мы можем указать несколько тем для прослушивания в рамках одной группы потребителей. Точно так же несколько методов могут прослушивать одну и ту же тему.
@KafkaListener(topics = "topic-1,topic-2", groupId = "group1")
void listener(@Payload String data,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) int offset) {
log.info("Received message [{}] from group1, partition-{} with offset-{}",
data,
partition,
offset);
}
Мы также можем получить некоторые полезные метаданные о потребляемом сообщении с помощью аннотации Header()
.
Потребление сообщений из определенного раздела с начальным смещением
В некоторых сценариях может потребоваться потреблять сообщения из определенного раздела темы Kafka, начиная с определенного смещения. Это может быть полезно, когда требуется повторная обработка определенных сообщений или тонкий контроль над тем, с чего начать их потребление.
@KafkaListener(
groupId = "group2",
topicPartitions = @TopicPartition(topic = "topic-2",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}))
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
log.info("Received Message [{}] from partition-{}",
message,
partition);
}
Устанавливая значение initialOffset
равным "0", мы даем Kafka указание начинать потребление сообщений с начала раздела. Если вы хотите просто указать раздел без initialOffset, просто напишите следующее:
@KafkaListener(groupId = "group2", topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "3" }))
KafkaListner на уровне классов
Аннотация на уровне класса подходит для случаев, когда необходимо сгруппировать логику обработки связанных сообщений. Сообщения из этих тем будут распределяться по методам внутри класса в зависимости от их параметров.
@Component
@Slf4j
@KafkaListener(id = "class-level", topics = "multi-type")
class KafkaClassListener {
@KafkaHandler
void listenString(String message) {
log.info("KafkaHandler [String] {}", message);
}
@KafkaHandler(isDefault = true)
void listenDefault(Object object) {
log.info("KafkaHandler [Default] {}", object);
}
}
Таким образом, мы можем сгруппировать методы, которые будут потреблять данные из определенных тем. Здесь мы можем перехватывать различные типы данных с помощью методов, аннотированных KafkaHandler
. Параметры метода будут определять способ получения данных, и если ни один из типов данных не совпадает, то будет применен метод по умолчанию.
Теперь, когда мы рассмотрели основы работы производителей и слушателей со строковыми сообщениями, давайте изучим различные сценарии и варианты их использования.
Использование шаблона RoutingKafkaTemplate
Мы можем использовать RoutingKafkaTemplate
, когда есть несколько производителей с различными конфигурациями и мы хотим выбрать производителя на основе имени темы во время выполнения.
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
// ProducerFactory with Bytes serializer
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
// ProducerFactory with String serializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile(".*-bytes"), bytesPF);
map.put(Pattern.compile("strings-.*"), stringPF);
return new RoutingKafkaTemplate(map);
}
RoutingKafkaTemplate
направляет сообщения первому экземпляру фабрики, который соответствует заданному имени темы из карты шаблонов regex и экземпляров ProducerFactory
. Шаблон strings-.*
должен быть первым, если есть два шаблона, str-.*
и strings-.*
, так как в противном случае шаблон str-.*
будет "перекрывать" его.
В приведенном примере мы создали два шаблона: .*-bytes
и strings-.*
Сериализация сообщений зависит от имени темы во время выполнения. Имена тем, заканчивающиеся на '-bytes
', будут использовать байтовый сериализатор, а начинающиеся на strings-.*
- StringSerializer.
Фильтрация сообщений
Все сообщения, удовлетворяющие критериям фильтра, будут отброшены еще до того, как они попадут к слушателю. Здесь отбрасываются сообщения, содержащие слово "ignored".
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(record -> record.value().contains("ignored"));
return factory;
}
Слушатель инкапсулируется в FilteringMessageListenerAdapter
. Этот адаптер опирается на реализацию RecordFilterStrategy
, в которой мы определяем метод фильтрации. Для вызова фильтра можно просто добавить одну строку в фабрику текущего потребителя.
Пользовательские сообщения
Теперь рассмотрим, как отправить или получить Java-объект. В нашем примере мы будем отправлять и получать объекты User
.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
String msg;
}
Конфигурация производителя и потребителя
Для конфигурации значений производителя мы будем использовать JSON Serializer:
@Bean
public ProducerFactory<String, User> userProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
А для потребителей это будет JSON Deserializer:
public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
Сериализатор и десериализатор JSON в spring-kafka используют библиотеку Jackson, которая отвечает за преобразование Java-объектов в байты и наоборот.
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.7.1</version>
</dependency>
Это необязательная зависимость, и если вы хотите ее использовать, то используйте ту же версию, что и spring-kafka.
Отправка Java-объектов
Давайте отправим объект User с помощью созданного нами шаблона userKafkaTemplate().
@Component
@Slf4j
public class KafkaSender {
@Autowired
private KafkaTemplate<String, User> userKafkaTemplate;
void sendCustomMessage(User user, String topicName) {
log.info("Sending Json Serializer : {}", user);
log.info("--------------------------------");
userKafkaTemplate.send(topicName, user);
}
Получение Java-объектов
@Component
@Slf4j
public class KafkaListenerExample {
@KafkaListener(topics = "topic-3", groupId = "user-group",
containerFactory = "userKafkaListenerContainerFactory")
void listenerWithMessageConverter(User user) {
log.info("Received message through MessageConverterUserListener [{}]", user);
}
Поскольку у нас несколько контейнеров-слушателей, мы указываем, какую фабрику контейнеров использовать.
Если мы не укажем атрибут containerFactory
, то по умолчанию будет использоваться kafkaListenerContainerFactory
, которая в нашем случае использует StringSerializer
и StringDeserializer
.
Заключение
В руководстве мы рассказали о настройке Kafka в приложении Spring Boot, и о том, как создавать и потреблять сообщения с помощью шаблонов и слушателей Kafka. Мы познакомились с примерами работы с различными типами сообщений, маршрутизацией сообщений, фильтрацией сообщений и преобразованием пользовательских форматов данных.
Если вы хотите глубже изучить Apache Kafka, приходите на интенсив Apache Kafka для разработчиков. В нём углублённая теория и практика на Java или Golang с платформой Spring+Docker+Postgres. Вы узнаете типовые шаблоны проектирования, сделаете своё приложение надёжнее, получите опыт разработки нескольких приложений, использующих Kafka.
???? Программа курса на нашем сайте
Курс поможет уменьшить время на рабочие задачи с Кафкой, добавить красивую строчку в резюме и взобраться на следующую ступень карьерной лестницы.