В этой статье разберемся, как реализовать обмен сообщениями между Java-микросервисами на Spring с помощью Kafka.

1. Архитектура

У нас будет Producer-микросервис ("писатель"), который получает заказы на еду (Food Order) и передает их через Kafka в Consumer-микросервис ("читатель") для сохранения в базу данных.

2. Пара слов о Kafka

Кластер Kafka обладает высокой масштабируемостью и отказоустойчивостью: при поломке одного из узлов, другие узлы берут на себя его работу, обеспечивая непрерывность работы без потери данных.

Чтение и запись данных в Kafka выполняется в виде событий, содержащих информацию в различном формате, например, в виде строки, массива или JSON-объекта. 

Producer (производитель, издатель) публикует (записывает) события в Kafka, а Consumer (потребитель, подписчик) подписывается на эти события и обрабатывает их.

3. Топики

События группируются в топики (topic). Топик похож на папку, а события — на файлы в этой папке. У топика может быть ноль, один или много издателей и подписчиков.

События можно прочитать столько раз, сколько необходимо. В этом отличие Kafka от традиционных систем обмена сообщениями: после чтения события не удаляются. Можно настроить, как долго Kafka хранит события.

4. Разделы

Топики поделены на разделы (partition). Публикация события в топике фактически означает добавление его к одному из разделов. События с одинаковыми ключами записываются в один раздел. В рамках раздела Kafka гарантирует порядок событий.

Для отказоустойчивости и высокой доступности топик может быть реплицирован, в том числе между различными, географически удаленными, датацентрами. То есть всегда будет несколько брокеров с копиями данных на случай, если что-то пойдет не так.

5. Создание проектов

Перейдите на start.spring.io и создайте проекты с зависимостями, показанными на рисунках ниже.

Producer-микросервис:

Consumer-микросервис:

6. Запуск Kafka в докере

В корне одного из проектов, неважно каком, создайте файл docker-compose.yml, содержащий параметры запуска Kafka, Kafdrop и Zookeeper в докере.

version: "3.7"

networks:
  kafka-net:
    name: kafka-net
    driver: bridge

services:
  zookeeper:
    image: zookeeper:3.7.0
    container_name: zookeeper
    restart: "no"
    networks:
      - kafka-net
    ports:
      - "2181:2181"

  kafka:
    image: obsidiandynamics/kafka
    container_name: kafka
    restart: "no"
    networks:
      - kafka-net
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  kafdrop:
    image: obsidiandynamics/kafdrop
    container_name: kafdrop
    restart: "no"
    networks:
      - kafka-net
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:29092"
    depends_on:
      - "kafka"

Далее, находясь в папке с docker-compose.yml выполните docker-compose up. После запуска контейнеров откройте Kafdrop (веб-интерфейс для управления Kafka) по адресу http://localhost:9000.

В Kafdrop можно смотреть топики, создавать их, удалять и делать многое другое.

7. Producer-микросервис

Архитектура:

Этапы создания Producer-микросервиса:

  • создаем конфигурационные бины;

  • создаем топик для заказов;

  • создаем контроллер FoodOrderController, сервис FoodOrderService и Producer;

  • преобразуем заказы FoodOrder в текстовый вид для отправки брокеру.

Переменные окружения и порт для нашего API (application.yml):

server:
  port: 8080

topic:
  name: t.food.order

Config отвечает за создание топика и бина KafkaTemplate, используемого для отправки сообщения.

@Configuration
public class Config {

    private final KafkaProperties kafkaProperties;

    @Autowired
    public Config(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        // get configs on application.properties/yml
        Map<String, Object> properties = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder
                .name("t.food.order")
                .partitions(1)
                .replicas(1)
                .build();
    }

}

Класс модели FoodOrder:

@Data
@Value
public class FoodOrder {
    String item;
    Double amount;
}

FoodOrderController отвечает за получение заказа FoodOrder и передачу его на уровень сервиса.

@Slf4j
@RestController
@RequestMapping("/order")
public class FoodOrderController {

    private final FoodOrderService foodOrderService;

    @Autowired
    public FoodOrderController(FoodOrderService foodOrderService) {
        this.foodOrderService = foodOrderService;
    }

    @PostMapping
    public String createFoodOrder(@RequestBody FoodOrder foodOrder) throws JsonProcessingException {
        log.info("create food order request received");
        return foodOrderService.createFoodOrder(foodOrder);
    }
}

FoodOrderService — получение заказа FoodOrder и передачу его Producer.

@Slf4j
@Service
public class FoodOrderService {

    private final Producer producer;

    @Autowired
    public FoodOrderService(Producer producer) {
        this.producer = producer;
    }

    public String createFoodOrder(FoodOrder foodOrder) throws JsonProcessingException {
        return producer.sendMessage(foodOrder);
    }
}

Producer получает заказ FoodOrder и публикует его в Kafka в виде сообщения.

В строке 18 мы конвертируем объект FoodOrder в JSON-строку для его передачи в виде строки в Consumer-микросервис.

В строке 19 фактически отправляем сообщение, передавая топик для публикации (переменная окружения в строке 6) и заказ в виде сообщения.

@Slf4j
@Component
public class Producer {

    @Value("${topic.name}")
    private String orderTopic;

    private final ObjectMapper objectMapper;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public Producer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
        this.kafkaTemplate = kafkaTemplate;
        this.objectMapper = objectMapper;
    }

    public String sendMessage(FoodOrder foodOrder) throws JsonProcessingException {
        String orderAsMessage = objectMapper.writeValueAsString(foodOrder);
        kafkaTemplate.send(orderTopic, orderAsMessage);

        log.info("food order produced {}", orderAsMessage);

        return "message sent";
    }
}

При запуске приложения мы должны увидеть топик, созданный в Kafdrop. А при отправке заказа FoodOrder — информацию в логе, что сообщение отправлено.

Теперь в Kafdrop в разделе Topics можем посмотреть созданный топик t.food.order и увидеть наше сообщение.

8. Consumer-микросервис

Архитектура:

Этапы создания Consumer-микросервиса:

  • конфигурируем group-id и бины;

  • настраиваем доступ к базе данных;

  • создаем Consumer и FoodOrderService;

  • создаем репозиторий FoodOrderRepository.

Начнем с настройки порта для запуска нашего API, топика, который будем слушать, group-id для Consumer-микросервиса и конфигурации базы данных.

server:
  port: 8081

topic:
  name: t.food.order

spring:
  kafka:
    consumer:
      group-id: "default"

  h2:
    console:
      enabled: true
      path: /h2-console
  datasource:
    url: jdbc:h2:mem:testdb
    username: sa
    password: password

Config отвечает за настройку бина ModelMapper — библиотеки для маппинга одних объектов на другие. Например, для DTO, используемого далее.

@Configuration
public class Config {

    @Bean
    public ModelMapper modelMapper() {
        return new ModelMapper();
    }

}

Классы модели:

@Data
@Value
public class FoodOrderDto {
    String item;
    Double amount;
}
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
public class FoodOrder {

    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String item;
    private Double amount;
}

Consumer отвечает за прослушивание топика с заказами и получение сообщений. Полученные сообщения мы преобразуем в FoodOrderDto, не содержащего ничего, связанного с персистентностью, например, ID.

@Slf4j
@Component
public class Consumer {

    private static final String orderTopic = "${topic.name}";

    private final ObjectMapper objectMapper;
    private final FoodOrderService foodOrderService;

    @Autowired
    public Consumer(ObjectMapper objectMapper, FoodOrderService foodOrderService) {
        this.objectMapper = objectMapper;
        this.foodOrderService = foodOrderService;
    }

    @KafkaListener(topics = orderTopic)
    public void consumeMessage(String message) throws JsonProcessingException {
        log.info("message consumed {}", message);

        FoodOrderDto foodOrderDto = objectMapper.readValue(message, FoodOrderDto.class);
        foodOrderService.persistFoodOrder(foodOrderDto);
    }

}

FoodOrderService — преобразование полученного DTO в объект FoodOrder и сохранение его в БД.

@Slf4j
@Service
public class FoodOrderService {

    private final FoodOrderRepository foodOrderRepository;
    private final ModelMapper modelMapper;

    @Autowired
    public FoodOrderService(FoodOrderRepository foodOrderRepository, ModelMapper modelMapper) {
        this.foodOrderRepository = foodOrderRepository;
        this.modelMapper = modelMapper;
    }

    public void persistFoodOrder(FoodOrderDto foodOrderDto) {
        FoodOrder foodOrder = modelMapper.map(foodOrderDto, FoodOrder.class);
        FoodOrder persistedFoodOrder = foodOrderRepository.save(foodOrder);

        log.info("food order persisted {}", persistedFoodOrder);
    }

}

Код FoodOrderRepository:

@Repository
public interface FoodOrderRepository extends JpaRepository<FoodOrder, Long> {
}

Теперь при запуске Consumer-микросервиса отправленные ранее сообщения будут прочитаны из соответствующего топика.

Здесь отмечу одну важную деталь: если мы перейдем в Kafdrop и проверим сообщение, которое только что получили, оно будет доступно. Но, например, в RabbitMQ мы бы его не увидели.

9. Дополнительный функционал

Мы можем отправлять периодические сообщения, включив функционал запуска задач по расписанию.

Для этого добавляем аннотацию @EnableScheduling к классу конфигурации Producer-микросервиса.

@EnableScheduling
@Configuration
public class Config {
    
    ...
    
}

Будем отправлять сообщения с фиксированным интервалом в 1000 миллисекунд.

@Slf4j
@Component
public class Scheduler {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    private Integer count = 0;

    @Scheduled(fixedRate = 1000)
    public void sendMessage() {
        count++;
        kafkaTemplate.send("t.scheduled", "message " + count);
        log.info("sent message count {}", count);
    }

}

Топик будет создан автоматически, но можно определить бин также, как делали раньше.

Получим следующий результат:

10. Заключение

Основная идея статьи была познакомить вас с использованием Kafka совместно с Java и Spring для реализации на ее основе более сложных решений.

Исходный код из статьи доступен на GitHub здесь.

Ссылки

  1. Документация Apache Kafka

  2. Kafka The Definitive Guide, O’Reilly

  3. Apache Kafka, Matthias J. Sax


Приглашаем всех желающих на открытое занятие «Разработка консольных приложений на Spring и Picocli». На данном занятии мы покажем, как строить Command Line Interface и утилиты командной строки на Picocli, как альтернативу Spring Shell. Также будут рассмотрены некоторые возможности Java для создания таких консольных утилит. Регистрация — по ссылке.

Комментарии (2)


  1. Lexer11l
    27.04.2022 20:46

    Хороший краткий гайд, но подобное уже было неоднократно. Ожидал увидеть более сложную реализацию. Например реализацию распределенных транзакций упрощенно, реализацию взаимодействия запрос-ответ и обработки сбоев


    1. Lexer11l
      28.04.2022 08:04

      А, ок. Не обратил внимания, что это перевод