Эта статья обьясняет, как создать слушатель в Kafka на лету в процессе работы приложения.
Plan:
Создадим шаблонный класс через реализацию интерфейса MessageListener.
Создадим KafkaListenerEndpoint с помощью шаблона.
Зарегестрируем эндпоинт в KafkaListenerEndpointRegistry.
Создадим окружение для тестирования.
Протестируем решение.
Заключение.
1. Создадим шаблонный класс через реализацию интерфейса MessageListener
Создадим класс KafkaTemplateListener который реализует интерфейс MessageListener. Этот шаблон источник логики для будущих динамически созданных слушателей.
public class KafkaTemplateListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("RECORD PROCESSING: " + record);
}
}
2. Создадим KafkaListenerEndpoint с помощью реализованного шаблона
В методе createDefaultMethodKafkaListenerEndpoint(String topic) нужно установить настройки, такие как Endpoint Id, Group Id, Topics и т.д.
В методе createKafkaListenerEndpoint(String topic) нужно установить шаблон слушателя и метод, который слушает сообщение из Kafka
@Service
public class KafkaListenerCreator {
String kafkaGroupId = "kafkaGroupId";
String kafkaListenerId = "kafkaListenerId-";
static AtomicLong endpointIdIndex = new AtomicLong(1);
private KafkaListenerEndpoint createKafkaListenerEndpoint(String topic) {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
createDefaultMethodKafkaListenerEndpoint(topic);
kafkaListenerEndpoint.setBean(new KafkaTemplateListener());
try {
kafkaListenerEndpoint.setMethod(KafkaTemplateListener.class.getMethod("onMessage", ConsumerRecord.class));
} catch (NoSuchMethodException e) {
throw new RuntimeException("Attempt to call a non-existent method " + e);
}
return kafkaListenerEndpoint;
}
private MethodKafkaListenerEndpoint<String, String> createDefaultMethodKafkaListenerEndpoint(String topic) {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>();
kafkaListenerEndpoint.setId(generateListenerId());
kafkaListenerEndpoint.setGroupId(kafkaGroupId);
kafkaListenerEndpoint.setAutoStartup(true);
kafkaListenerEndpoint.setTopics(topic);
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
return kafkaListenerEndpoint;
}
private String generateListenerId() {
return kafkaGeneralListenerEndpointId + endpointIdIndex.getAndIncrement();
}
}
3. Зарегестрируем эндпоинт в KafkaListenerEndpointRegistry
Логика, которая регистрирует слушатель находиться в теле метода createAndRegisterListener(String topic). Логика находиться в том же классе KafkaListenerCreator.
@Service
public class KafkaListenerCreator {
//... HERE HAS TO BE VARIABLES FROM PREVIOUS EXAMPLE
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;
public void createAndRegisterListener(String topic) {
KafkaListenerEndpoint listener = createKafkaListenerEndpoint(topic);
kafkaListenerEndpointRegistry.registerListenerContainer(listener, kafkaListenerContainerFactory, true);
}
//... HERE HAS TO BE METHODS FROM PREVIOUS EXAMPLE
}
4. Создадим окружение для тестирования
Сначала создадим REST метод для для создания слушателя Kafka.
Я создал класс KafkaController и метод create(String topic). Этот метод может быть вызван с помощью POST HTTP запроса.
@RestController
public class KafkaController {
@Autowired
KafkaListenerCreator kafkaListenerCreator;
@PostMapping(path = "/create")
@ResponseStatus(HttpStatus.OK)
public void create(@RequestParam String topic) {
kafkaListenerCreator.createAndRegisterListener(topic);
}
}
Далее создадим метод для отправки сообщения в ново созданный слушатель.
Метод send(@RequestParam String topic, @RequestParam String message) имеет два параметра. Где “topic” это имя топика у слушателя Kafka, а “message” это текст сообщения, которое отправляется через KafkaTemplate.
@RestController
public class KafkaController {
//... HERE HAS TO BE VARIABLES FROM PREVIOUS EXAMPLE
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping(path = "/send")
@ResponseStatus(HttpStatus.OK)
public void send(@RequestParam String topic, @RequestParam String message) {
kafkaTemplate.send(topic, message);
}
//... HERE HAS TO BE METHODS FROM PREVIOUS EXAMPLE
}
5. Протестируем решение
вызвать http://localhost:8080/create?topic=myTopic1
вызвать http://localhost:8080/send?topic=myTopic1&message=myTxt1
ожидаемый лог: “ RECORD PROCESSING: myTxt1”
6. Заключение
Эта статья предоставляет быстрое решение для проблемы динамического создания слушателей в Kafka.
Дополнительные ресурсы
Комментарии (2)
mikegordan
05.09.2023 09:35+1Конечно такое себе..
Я ожидал такой юз кейс:
Допустим Кафка и Микросервис развернут в кибернетусе. Мы увеличиваем у очереди партиции, как сделать так чтобы Кубер это понял и поднял еще инстансы под количества партиций автоматически.
aleksandy
Всё это, конечно, занимательно, но почему ни в тэгах, ни в заголовке ничего не сказано, что всё написанное относится исключительно к спринговой обвязке к кафке?