В этой статье описывается способ отправки сообщения в несколько слушателей Kafka одновременно. Несколько слушателей будут получать одно и то же сообщение, от одного и того же отправителя сообщений, в данной реализации решения.
Логика реализована с использованием Java, Spring и Kafka.
План
Создать слушатели Kafka
Создать отправителя сообщений Kafka
Создать решение для тестирования
Провести тестирование
Создать список определений для используемых в решении механизмов
1. Создать слушатели Kafka
Создадим 2 слушателя через аннотацию KafkaListener. У этих слушателей должно быть одно и то же название топика(topic) и разные название группы(group).
@Service
public class KafkaTester {
//Listener 1
@KafkaListener(id = "id1",
groupId = "group-one",
topics = "topic-one")
public void listenServiceCall(@Payload String message) {
//Logging
System.out.println("GROUP ONE MESSAGE " + message);
}
//Listener 2
@KafkaListener(id = "id2",
groupId = "group-two",
topics = "topic-one")
public void listenServiceCall2(@Payload String message) {
//Logging
System.out.println("GROUP TWO MESSAGE " + message);
}
}
Логирование сообщения в слушателях необходимо для проведения тестирования решения.
2. Создать отправителя сообщений Kafka
Создадим отправителя сообщений с применением класса KafkaTemplate.
@Service
public class KafkaTester {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//... LISTENERS FROM PREVIOUS EXAMPLE
}
Объект KafkaTemplate внедряется в сервис KafkTester через аннотацию Autowired.
3. Создать решение для тестирования
Создадим метод, который будет включаться, в определенные нами промежутки времени. Назовем метод scheduledSend(). Далее, напишем логику отправки сообщения через Kafka, в рамках метода scheduledSend().
Для реализации метода, который будет периодически включаться(scheduledSend()), используется аннотация Schedulled. Период для включения метода установлен в 10 секунд.
@Service
public class KafkaScheduler {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//10 seconds period
@Scheduled(cron = "*/10 * * * * *")
public void send() {
kafkaTemplate.send("topic-one", "kafkaMessage " + new Date());
System.out.println("MESSAGE WAS SENT");
}
@KafkaListener(id = "id1",
groupId = "group-one",
topics = "topic-one")
public void listenServiceCall(@Payload String message) {
System.out.println("GROUP ONE MESSAGE " + message);
}
@KafkaListener(id = "id2",
groupId = "group-two",
topics = "topic-one")
public void listenServiceCall2(@Payload String message) {
System.out.println("GROUP TWO MESSAGE " + message);
}
}
В данном коде видно, что необходимо отправлять сообщение с указанием такого же названия топика(topic), как у слушателей. В данном случае, слушатели примут одно и то же сообщение.
4. Провести тестирование
Запустить приложение с использованием кодом класса KafkaScheduler.
Получить лог программы с одним и тем же временем в сообщениях, с точностью до секунды.
SENT MESSAGE
GROUP ONE MESSAGE kafkaMessage Tue Sep 12 15:17:40 EDT 2023
GROUP TWO MESSAGE kafkaMessage Tue Sep 12 15:17:40 EDT 2023
SENT MESSAGE
GROUP ONE MESSAGE kafkaMessage Tue Sep 12 15:17:50 EDT 2023
GROUP TWO MESSAGE kafkaMessage Tue Sep 12 15:17:50 EDT 2023
5. Список определений
Название топика(topic) - метка для группировки сообщений.
Идентификатор группы(groupId) - метка для группировки слушателей.
klis
Это какой-то нестандартный сценарий? Или решение какой-то проблемы? Зачем это все?
А это вообще низкопробное шарлатанство - вы же время зашили текстом в отправляемом сообщении))) С чего бы вдруг оно должно отличаться?
Что происходит вообще на хабре? Какой-то очередной поток какого-то очередного курса закончился?
martyncev
Ну так сессия близится.. нужны статья на профильных ресурсах)