Эта статья обьясняет, как создать слушатель в Kafka на лету в процессе работы приложения.

Plan:

  1. Создадим шаблонный класс через реализацию интерфейса MessageListener.

  2. Создадим KafkaListenerEndpoint с помощью шаблона.

  3. Зарегестрируем эндпоинт в KafkaListenerEndpointRegistry.

  4. Создадим окружение для тестирования.

  5. Протестируем решение.

  6. Заключение.

1. Создадим шаблонный класс через реализацию интерфейса MessageListener

Создадим класс KafkaTemplateListener который реализует интерфейс MessageListener. Этот шаблон источник логики для будущих динамически созданных слушателей.

public class KafkaTemplateListener implements MessageListener<String, String> { 
    @Override    
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println("RECORD PROCESSING: " + record);
    }
}

2. Создадим KafkaListenerEndpoint с помощью реализованного шаблона

В методе createDefaultMethodKafkaListenerEndpoint(String topic) нужно установить настройки, такие как Endpoint Id, Group Id, Topics и т.д.

В методе createKafkaListenerEndpoint(String topic) нужно установить шаблон слушателя и метод, который слушает сообщение из Kafka

@Service
public class KafkaListenerCreator {
    String kafkaGroupId = "kafkaGroupId";
    String kafkaListenerId = "kafkaListenerId-";
    static AtomicLong endpointIdIndex = new AtomicLong(1);

    private KafkaListenerEndpoint createKafkaListenerEndpoint(String topic) {
        MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
            createDefaultMethodKafkaListenerEndpoint(topic);
        kafkaListenerEndpoint.setBean(new KafkaTemplateListener());
        try {
            kafkaListenerEndpoint.setMethod(KafkaTemplateListener.class.getMethod("onMessage", ConsumerRecord.class));
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("Attempt to call a non-existent method " + e);
        }
        return kafkaListenerEndpoint;
    }

    private MethodKafkaListenerEndpoint<String, String> createDefaultMethodKafkaListenerEndpoint(String topic) {
        MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>();
        kafkaListenerEndpoint.setId(generateListenerId());
        kafkaListenerEndpoint.setGroupId(kafkaGroupId);
        kafkaListenerEndpoint.setAutoStartup(true);
        kafkaListenerEndpoint.setTopics(topic);
        kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        return kafkaListenerEndpoint;
    }

    private String generateListenerId() {
        return kafkaGeneralListenerEndpointId + endpointIdIndex.getAndIncrement();

    }
}

3. Зарегестрируем эндпоинт в KafkaListenerEndpointRegistry

Логика, которая регистрирует слушатель находиться в теле метода createAndRegisterListener(String topic). Логика находиться в том же классе KafkaListenerCreator.

@Service
public class KafkaListenerCreator {
  //... HERE HAS TO BE VARIABLES FROM PREVIOUS EXAMPLE

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
  @Autowired
  private KafkaListenerContainerFactory kafkaListenerContainerFactory;

  public void createAndRegisterListener(String topic) {
    KafkaListenerEndpoint listener = createKafkaListenerEndpoint(topic);
    kafkaListenerEndpointRegistry.registerListenerContainer(listener, kafkaListenerContainerFactory, true);
  }

  //... HERE HAS TO BE METHODS FROM PREVIOUS EXAMPLE

}

4. Создадим окружение для тестирования

Сначала создадим REST метод для для создания слушателя Kafka.

Я создал класс KafkaController и метод create(String topic). Этот метод может быть вызван с помощью POST HTTP запроса.

@RestController
public class KafkaController {
    @Autowired
    KafkaListenerCreator kafkaListenerCreator;

    @PostMapping(path = "/create")
    @ResponseStatus(HttpStatus.OK)
    public void create(@RequestParam String topic) {
        kafkaListenerCreator.createAndRegisterListener(topic);
    }
}

Далее создадим метод для отправки сообщения в ново созданный слушатель.

Метод send(@RequestParam String topic, @RequestParam String message) имеет два параметра. Где “topic” это имя топика у слушателя Kafka, а “message” это текст сообщения, которое отправляется через KafkaTemplate.

@RestController
public class KafkaController {
    //... HERE HAS TO BE VARIABLES FROM PREVIOUS EXAMPLE
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping(path = "/send")
    @ResponseStatus(HttpStatus.OK)
    public void send(@RequestParam String topic, @RequestParam String message) {
        kafkaTemplate.send(topic, message);
    }
    //... HERE HAS TO BE METHODS FROM PREVIOUS EXAMPLE
}

5. Протестируем решение

  1. вызвать http://localhost:8080/create?topic=myTopic1

  2. вызвать http://localhost:8080/send?topic=myTopic1&message=myTxt1

  3. ожидаемый лог: “ RECORD PROCESSING: myTxt1”

6. Заключение

Эта статья предоставляет быстрое решение для проблемы динамического создания слушателей в Kafka.

Дополнительные ресурсы

https://medium.com/bliblidotcom-techblog/dynamic-spring-boot-kafka-consumer-af8740f2c703?source=post_page-----4f8f359d715e--------------------------------

https://github.com/spring-projects/spring-kafka/issues/460?source=post_page-----4f8f359d715e--------------------------------

Комментарии (2)


  1. aleksandy
    05.09.2023 09:35
    +2

    Всё это, конечно, занимательно, но почему ни в тэгах, ни в заголовке ничего не сказано, что всё написанное относится исключительно к спринговой обвязке к кафке?


  1. mikegordan
    05.09.2023 09:35
    +1

    Конечно такое себе..

    Я ожидал такой юз кейс:

    Допустим Кафка и Микросервис развернут в кибернетусе. Мы увеличиваем у очереди партиции, как сделать так чтобы Кубер это понял и поднял еще инстансы под количества партиций автоматически.