Kafka
Kafka

Введение

В этой статье мы поработаем с Kafka на практике и построим полноценную Event-Driven микросервисную платформу. Использовать будем Spring Boot и Java.

Изначально я хотел рассказать про гарантии доставки, посмотреть на решение проблем, которые могут возникнуть при работе с Kafka. Но тогда статья бы получилась излишне объёмной, и информации для новичка было бы многовато. Поэтому посмотрим на это в следующий раз. В этой статье мы ограничимся базовыми практическими навыками.

Для того, чтобы вас по ходу статьи ничего не испугало, потребуются навыки работы с:

  1. Java

  2. Spring Boot

  3. Docker

  4. Docker Compose

  5. Postman

Отмечу, что я старался упрощать код в примерах, чтобы сильно не уходить от нашей цели поработать с Kafka на практике. Поэтому мы не будем создавать кастомные исключения, обрабатывать их, делать валидацию и т.п. Так что не спешите закидывать меня тапками за не production-ready код :)

Если вы не знакомы с основами Kafka, то рекомендую к прочтению мою предыдущую статью, в которой я постарался рассказать основные теоретические моменты.

Конфигурация сервисов

Будем работать с интернет-магазином из предыдущей статьи.

Для начала давайте перейдем на сайт start.spring.io для того, чтобы сконфигурировать наши Spring Boot проекты.

Сконфигурируем сервис остатков товаров:

inventory-service
inventory-service

Далее сконфигурируем сервис уведомлений:

notification-service
notification-service

Сконфигурируем сервис аналитики:

analytics-service
analytics-service

И, наконец, сконфигурируем сервис заказов:

order-service
order-service

Некоторые читатели заметят, что в предыдущей статье в примерах не было сервиса заказов. Я его добавил сюда, так как на практике без него одна за другой начали возникать архитектурные компромиссы.

Обговорим зависимости.

  1. Spring for Apache Kafka
    Эта зависимость добавлена во все наши сервисы, так как все они взаимодействуют с Kafka.

  2. Spring Web
    Эта зависимость также добавлена везде, так как это базовая зависимость.

Для удобства советую все сервисы распаковать в одну директорию с названием, например, kafka-article-2.

Обзор конечной архитектуры

В этом разделе обговорим, что конкретно мы будем иметь в итоге. Это поможет понимать некоторые шаги далее по статье.

Архитектура нашей платформы будет следующей:

архитектура платформы
архитектура платформы

Клиент отправляет POST-запрос в order-service. Клиент моментально получит ответ с кодом 201 Created и телом, в котором будет лежать строка с номером заказа.

После этого в дело вступит наш Event-Driven подход. order-service произведёт событие order-placed. Это событие перехватит inventory-service. В зависимости от текущего состояния склада, inventory-service произведёт одно из двух событий: inventory-reserved или inventory-rejected. Эти события перехватят analytics-service и notification-service для выполнения соответствующей логики.

Роли сервисов

Как вы можете видеть, у нас есть три вида сервиса:

  1. Продюсер
    order-service только производит события.

  2. Консьюмер
    notification-service и analytics-service только потребляют события.

  3. Продюсер и консьюмер одновременно
    inventory-service потребляет и производит события.

Настройка Kafka с помощью Docker Compose

Итак. Давайте настроим контейнеры с Kafka с помощью Docker Compose. В нашем примере их будет 3. Мы будем использовать KRaft вместо Zookeeper (об этих подходах к координации кластера мы поговорим в соответствующей статье). Наши Kafka будут и брокерами и контроллерами одновременно. Также будем использовать Kafka-UI, чтобы наглядно видеть то, что происходит внутри нашего кластера.

Создадим в директории kafka-article-2 файл docker-compose.yaml. В него запишем необходимую информацию:

services:
  kafka-0:
    # Будем использовать данный образ
    image: bitnami/kafka:4.0.0
    # Имя контейнера
    container_name: kafka-0
    # hostname контейнера
    hostname: kafka-0
    # Пробрасываем порты, чтоб слать в Kafka сообщения с хоста
    ports:
      - "29092:29092"
    environment:
      # Уникальный ID кластера
      KAFKA_KRAFT_CLUSTER_ID: 0
      # Уникальный ID ноды внутри кластера
      KAFKA_CFG_NODE_ID: 0
      # Нода и контроллер, и брокер
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      # "Слушатели" используются для указания способа подключения к Kafka ноде,
      # То есть Kafka откроет порты и будет слушать на них подключения
      # В нашем случае первое слово - Alias, ://: - на всех сетевых интерфейсах,
      # Ну и в конце, очевидно, порт
      KAFKA_CFG_LISTENERS: INTERNAL://:9091,EXTERNAL://:29092,CONTROLLER://:9093
      # Перечисляем список нод, которые являются контроллерами в формате
      # [NodeID]@[Hostname]:[Port]
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      # Здесь мы указываем, куда Kafka будет "перенаправлять" клиента после того,
      # Как тот подключится к ней. Контроллер не указываем, так как это нужно
      # Только для внутренней работы Kafka
      KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka-0:9091,EXTERNAL://localhost:29092
      # Настраиваем шифрование
      # В нашем случае PLAINTEXT означает его отсутствие
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      # Указываем, что для контроллера используется "Слушатель" CONTROLLER
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      # Указываем, что для брокеров используется "Слушатель" INTERNAL
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
      # Указываем, что для клиентов используется "Слушатель" EXTERNAL
      KAFKA_CLIENT_LISTENER_NAME: EXTERNAL

  kafka-1:
    image: bitnami/kafka:4.0.0
    container_name: kafka-1
    hostname: kafka-1
    ports:
      - "29093:29092"
    environment:
      KAFKA_KRAFT_CLUSTER_ID: 0
      KAFKA_CFG_NODE_ID: 1
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_LISTENERS: INTERNAL://:9091,EXTERNAL://:29092,CONTROLLER://:9093
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka-1:9091,EXTERNAL://localhost:29093
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CLIENT_LISTENER_NAME: EXTERNAL

  kafka-2:
    image: bitnami/kafka:4.0.0
    container_name: kafka-2
    hostname: kafka-2
    ports:
      - "29094:29092"
    environment:
      KAFKA_KRAFT_CLUSTER_ID: 0
      KAFKA_CFG_NODE_ID: 2
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_LISTENERS: INTERNAL://:9091,EXTERNAL://:29092,CONTROLLER://:9093
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka-2:9091,EXTERNAL://localhost:29094
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CLIENT_LISTENER_NAME: EXTERNAL

  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    hostname: kafka-ui
    depends_on:
      - kafka-0
      - kafka-1
      - kafka-2
    ports:
      - "8086:8080"
    environment:
      # Название кластера в веб-интерфейсе
      KAFKA_CLUSTERS_0_NAME: local
      # Указываем адреса наших брокеров в кластере
      # так как Kafka-UI и брокеры в одной Docker-сети, то используем порты 9091
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9091,kafka-1:9091,kafka-2:9091
      # Настройка для динамической конфигурации брокеров через UI
      DYNAMIC_CONFIG_ENABLED: true

Работа с Kafka из продюсера (order-service)

Настройка properties для продюсера

Наше приложение должно знать, по какому адресу находятся наши брокеры. Также нужно указать, как наш producer будет сериализовать данные при отправке в Kafka. Напомню, что сериализация - процесс преобразования данных для передачи (например, по сети). Мы будем сериализовать ключ сообщения в строку, а значение — в JSON. Поэтому в файл application.properties впишем эту информацию:

# Core application configuration
spring.application.name=order-service
server.port=8080

# Kafka properties
spring.kafka.bootstrap-servers=localhost:29092,localhost:29093,localhost:29094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Конфигурация топиков из Java-кода

Теперь создадим конфигурационный класс для Kafka. Для этого создайте класс KafkaConfig в пакете config:

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic orderPlacedTopic() {
        return TopicBuilder.name("order-placed")
            .partitions(3)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "86400000")
            .config(TopicConfig.RETENTION_BYTES_CONFIG, "524288000")
            .build();
    }
}

Мы создали топик для заказов.

В топике мы делаем 3 партиции и фактор репликации = 3. Это значит, что ситуация будет примерно следующая:

Kafka-1: partition-0 (leader), partition-1 (follower), partition-2 (follower)
Kafka-2: partition-1 (leader), partition-2 (follower), partition-0 (follower)
Kafka-3: partition-2 (leader), partition-0 (follower), partition-1 (follower)

Также добавили настройки для retention:

  • Сообщения будут храниться 1 день

  • Максимальный размер лога 500 мб

Это я сделал, так как в предыдущей статье обещал рассмотреть задание этих настроек на практике :)

Топики, относящиеся к складу, мы создадим в inventory-service. Настороим их там аналогично.

DTO для отправки в теле запроса

Сделаем специальный DTO-класс, который мы будем передавать в теле POST-запроса к нашему приложению. Для этого создадим record (Этот тип появился в Java 16) для избежания boilerplate кода. Создайте его в пакете dto:

public record OrderRequest(String email, String productName, Integer quantity) {}

Для простоты дадим возможность заказать только один тип продукта. Это сделано, чтобы не усложнять то, что к непосредственной работе с Kafka не относится.

Event-DTO, представляющий собой Kafka-сообщение

В пакете event создадим класс, который будет сериализован в JSON и отправлен в Kafka:

public record OrderPlacedEvent(String orderId, String email, String productName, Integer quantity) {} оно?

Сервис для работы с заказами

В пакете service создадим сервис для работы с заказами:

@Service
public class OrderService {
    private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    public OrderService(final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public String placeOrder(OrderRequest orderRequest) {
        String orderId = UUID.randomUUID().toString();
        OrderPlacedEvent orderPlacedEvent = new OrderPlacedEvent(
            orderId,
            orderRequest.email(),
            orderRequest.productName(),
            orderRequest.quantity()
        );

        kafkaTemplate.send("order-placed", orderPlacedEvent);

        return orderId;
    }
}

Отправка сообщений в Kafka

Как вы можете видеть, мы отправляем сообщение в Kafka с помощью бина KafkaTemplate, созданного Spring Boot автоматически. Это generic-класс с двумя параметрами:

  1. Первый параметр — тип ключа сообщения (по умолчанию ставим String)

  2. Второй параметр — тип значения (тела) сообщения

Поскольку мы отправляем событие одного типа, мы можем указать его тип вторым параметром. Если нужно будет отправлять события разных типов, следует использовать Object.

Для отправки сообщения нам нужно просто обратиться к этому бину и вызвать метод send, передав название топика и тело сообщения. Несложно, правда? :)


Пока что мы не трогаем ключ, хедеры и таймстамп. О них будет далее.

Лирическое отступление

Важно упомянуть, что по умолчанию при отправке сообщения будет использоваться гарантия at least once. Это значит, что сообщения отправится как минимум 1 раз. При определённых обстоятельствах возможны дубликаты. О гарантиях и работе с дубликатами подробно мы поговорим в следующей статье.

Также отмечу, что я не буду использовать Lombok в своих примерах, дабы не разжигать холиваров в комментариях, так как инструмент достаточно спорный.

Контроллер для маппинга запроса на эндпоинт

В пакете controller создадим контроллер:

@RestController
@RequestMapping("/api/order")
public class OrderController {
    private final OrderService orderService;
    
    public OrderController(OrderService orderService) {
        this.orderService = orderService;
    }
    
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public String placeOrder(@RequestBody OrderRequest orderRequest) {
        return orderService.placeOrder(orderRequest);
    }
}

На этом настройка order-service завершена.

Работа с Kafka из консьюмера (notification-service)

Теперь перейдём к настройке консьюмера. Скажу сразу, что писать логику отправки сообщений мы не будем, так как статья неплохо так растянется, а вы пришли не за этим :)

Настройка properties для консьюмера

Для начала давайте залезем в application.properties и настроим всё что нужно для работы с Kafka из нашего консьюмера:

# Core application configuration
spring.application.name=notification-service
server.port=8082

# Kafka Properties
spring.kafka.bootstrap-servers=localhost:29092,localhost:29093,localhost:29094
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=notificationService
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.properties.spring.json.trusted.packages=io.mitochondria.inventory.event

Здесь мы используем адреса брокеров, десериализаторы (основываясь на сериализаторах продюсера, сообщения которого ловит наш консьюмер), айди группы, авто оффсет, разрешённые пакеты для десериализации. С первыми тремя строчками конфига всё понятно, а вот остальные требуют объяснения

  1. group-id
    Помните, в предыдущей статье я рассказывал про Consumer Groups? Так вот, эта настройка group-id задаёт принадлежность нашего приложения к той самой группе. Мы запустим несколько инстансов консьюмеров в одной группе и посмотрим распределение партиций ниже.

  2. auto-offset-reset
    Надеюсь, вы не забыли информацию об offset из предыдущей статьи. Так вот. Эта настройка управляет значением offset, с которого консьюмер начинает читать сообщения в партиции, если для него нет сохранённого offset (например, при первом запуске группы консьюмеров). Значений у этой настройки 3:

    1. earliest
      Консьюмер читает с самого раннего offset партиции.

    2. latest
      Консьюмер читает с самого позднего offset партиции.

    3. none
      Консьюмер при отсутствии явного указания offset выбрасывает исключение. Настройка достаточно специфичная и используется в редких сценариях.

  3. trusted.packages
    Видите ли какое дело. По умолчанию, при отправке сообщения в Kafka из Spring Boot, к нему добавляется специальный хедер, хранящий информацию о том, какой конкретный класс был сериализован в JSON. Это нужно для того, чтобы консьюмер понял, в какой класс десериализовывать полученный JSON. Например, при отправке сообщения в топик inventory-rejected, секция хедеров будет выглядеть как-то так:

    {
    	"__TypeId__": "io.mitochondria.inventory.event.InventoryRejectedEvent"
    }

    Отсюда становится понятно, что в консьюмерах нам нужно создать те же dto в том же пакете, что и в продюсере, на события которого реагирует наш консьюмер (в точности). Предупреждаю, что мы будем конфигурировать inventory-service позже, тогда и рассмотрим dto-events, которые будем там использовать.

Сервис, в котором мы будем ловить Kafka-сообщения

Далее давайте напишем логику, выполняемую при приёме сообщения. Для этого создадим сервис в пакете service:

@Service
public class NotificationService {
    private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);

    @KafkaListener(topics = "inventory-reserved")
    public void sendNotificationIfReserved(InventoryReservedEvent inventoryReservedEvent) {
        logger.info("Received inventory reserved event: {}", inventoryReservedEvent);
    }

    @KafkaListener(topics = "inventory-rejected")
    public void sendNotificationIfRejected(InventoryRejectedEvent inventoryRejectedEvent) {
        logger.info("Received inventory rejected event: {}", inventoryRejectedEvent);
    }
}

Аннотация @KafkaListener используется для, как ни странно, прослушивания конкретного топика и выполнения логики, прописанной в аннотированном методе. Каждый такой метод работает в отдельном потоке, что гарантирует нам concurrency в обработке событий.

Для простоты в методах мы просто будем логировать полученные сообщения.

Как вы можете видеть, в приёме сообщений тоже нет ничего сложного. Spring Boot даёт нам очень удобную апишку.

Для нашего сервиса аналитики действия будут аналогичные. Я уверен, что вы сможете настроить его самостоятельно.

Работа с Kafka из консьюмера и продюсера одновременно (inventory-service)

Теперь мы будем настраивать сервис, который одновременно является продюсером и консьюмером.

Перейдём в application.properties файл и запишем туда неоходимую информацию:

# Core application configuration
spring.application.name=inventory-service
server.port=8081

# Kafka properties
spring.kafka.bootstrap-servers=localhost:29092,localhost:29093,localhost:29094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=inventoryService
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.properties.spring.json.trusted.packages=io.mitochondria.order.event

Надеюсь, вы тут не найдёте ничего сложного. Мы просто-напросто указываем параметры для продюсера и для консьюмера.

Создание топиков для сообщений, связанных с инвентарём

Создадим конфигурационный класс в пакете config:

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic inventoryReservedTopic() {
        return TopicBuilder.name("inventory-reserved")
            .partitions(3)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "86400000")
            .config(TopicConfig.RETENTION_BYTES_CONFIG, "524288000")
            .build();
    }

    @Bean
    public NewTopic inventoryRejectedTopic() {
        return TopicBuilder.name("inventory-rejected")
            .partitions(3)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "86400000")
            .config(TopicConfig.RETENTION_BYTES_CONFIG, "524288000")
            .build();
    }
}

Dto-event из order-service

Чтобы принимать сообщения из order-service, нам нужен класс, на который мы будем маппить JSON из тела сообщения. Я ранее говорил, что нам нужен в точности тот же класс, что и в продюсере. В нашем случае продюсер — это наш сервис заказов. Соответственно, создадим такой же record в том же пакете (io.mitochondria.order.event.OrderPlacedEvent).

Репозиторий для работы с остатками

В пакете repository создадим репозиторий:

@Repository
public class InventoryRepository {
    private final Map<String, Integer> inventory = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        inventory.put("Smartphone", 5);
        inventory.put("Tablet", 10);
        inventory.put("Desktop", 6);
    }

    public void deductStock(OrderPlacedEvent orderPlacedEvent) {
        Integer result = inventory.computeIfPresent(orderPlacedEvent.productName(), (name, quantity) -> {
            if (quantity < orderPlacedEvent.quantity()) {
                throw new RuntimeException("Quantity exceeded");
            }

            return quantity - orderPlacedEvent.quantity();
        });

        if (result == null) {
            throw new RuntimeException("Stock exceeded");
        }
    }
}

DTO-events для Kafka-сообщений, связанных с инвентарём

Cоздадим в пакете event два record, которые мы будем передавать в теле Kafka-сообщения. Один — в случае успешной покупки, другой — в случае ошибки:

public record InventoryReservedEvent(String orderID, String email) {}
public record InventoryRejectedEvent(String orderID, String email) {}

Да, они одинаковые. Но всё равно необходимо разделить эти сущности, так как в дальнейшем мы, возможно, захотим изменить одну из них, а вторую — нет.

Сервис для работы с остатками

В пакете service создадим сервис:

@Service
public class InventoryService {
    private final InventoryRepository inventoryRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public InventoryService(InventoryRepository inventoryRepository, KafkaTemplate<String, Object> kafkaTemplate) {
        this.inventoryRepository = inventoryRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @KafkaListener(topics = "order-placed")
    public void reserveInventory(OrderPlacedEvent orderPlacedEvent) {
        try {
            inventoryRepository.deductStock(orderPlacedEvent);

            InventoryReservedEvent inventoryReservedEvent = new InventoryReservedEvent(
                orderPlacedEvent.orderId(),
                orderPlacedEvent.email()
            );

            kafkaTemplate.send("inventory-reserved", inventoryReservedEvent);
        }
        catch (Exception e) {
            InventoryRejectedEvent inventoryRejectedEvent = new InventoryRejectedEvent(
                orderPlacedEvent.orderId(),
                orderPlacedEvent.email()
            );

            kafkaTemplate.send("inventory-rejected", inventoryRejectedEvent);
        }
    }
}

Подготовка к рассылке сообщений

Развёртывание инфраструктуры

Итак. Всю необходимую подготовку наших сервисов мы выполнили. Теперь будем разворачивать нашу инфраструктуру.

Сначала запустим наши контейнеры с помощью команды docker compose up -d в корне нашей директории kafka-article-2.

Далее запустим order-service и inventory-service в одном инстансе.

Запустим notification-service и analytics-service в двух инстансах. Вы можете это сделать, задав VM параметр -Dserver.port при запуске каждого из ин, чтобы не было конфликтов по портам. Как конкретно это сделать мы рассматривать не будем, так как это зависит от того, как вы запускаете приложения.

Работа с Kafka-UI

Давайте перейдём по адресу http://localhost:8086/ в браузере, чтобы зайти в наш KafkaUI. Если вы всё сделали правильно, то у вас интерфейс должен выглядеть примерно так:

KakfaUI
KakfaUI

Я вам советую исследовать этот интерфейс и покликать на всё, что кликается. Работа с этим интерфейсом может прояснить многие моменты.

У вас, наверняка, возникнут вопросы при переходе в раздел Topics. Дело в том, что вы там ожидаете увидеть 3 топика (которые мы создавали). А на деле их там 4. Выглядит это как-то так:

Топики в Kafka-UI
Топики в Kafka-UI

Но на самом деле в этом таинственном топике нет ничего сложного. Он нужен для хранения оффсетов для каждого консьюмера в каждой Consumer Group. В самом деле. Когда мы отключим группу консьюмеров, а затем подключим по новой, то как узнать, с какого места им читать?

То, как распределяются партиции топика в рамках Consumer Group можно увидеть на следующей картинке:

Распределение партиций по консьюмерам в группе
Распределение партиций по консьюмерам в группе

В целом, здесь, наверное, дальнейшие комментации излишни. В KafkaUI это всё достаточно понятно показано.

Начинаем жонглировать сообщениями

Итак. Переходим к самому интересному. К рассылке сообщений.

Переходим в Postman для работы с нашей платформой

Для того, чтобы послать сообщение в Kakfa, откроем Postman.

Давайте попробуем заказать в нашем магазине 100 смартфонов (не спрашивайте, зачем). Для этого отправим PUT-запрос к нашему order-service. В теле запроса будет следующий JSON:

{
  "email": "test@example.com",
  "productName": "Smartphone",
  "quantity": 100
}

К сожалению, наш магазин ещё не успел разрастись. В нём нет 100 смартфонов. Следовательно, заказ не будет сделан. Поэтому цепочка будет следующей: order-service -> (order-placed-event) -> inventory-service -> (inventory-rejected-event) -> (notification-service, analytics-service).

Заходим в логи консьюмера и радуемся результату

Теперь зайдём в логи наших консьюмеров. Так как у нас 2 группы консьюмеров, то в каждой группе только один консьюмер залогирует получение сообщения. Давайте откроем, скажем, консьюмеров из группы notificationService. В логах одного из инстансов мы увидим:

Received inventory rejected event: InventoryRejectedEvent[orderID=956ac0ca-0a30-4d8d-938c-bfd80a634234, email=test@example.com]

Ощущаете это чувство? Вы только что написали свой первый микросервисный проект с Event-Driven подходом :)

Прикрепление ключа к сообщению

Теперь давайте научимся отправлять больше информации в нашем сообщении.

Как нам быть, если мы хотим отправить несколько сообщений в одну партицию?

Если вы помните, в предыдущей статье я писал, что для этого нужно указать для всех таких сообщений один ключ. Это делается несложно. Для этого у KafkaTemplate существует перегрузка метода send.

Давайте используем это в наших order-service и inventory-service.

В классе OrderService:

kafkaTemplate.send("order-placed", orderPlacedEvent.orderId(), orderPlacedEvent);

В классе InventoryService:

kafkaTemplate.send("inventory-reserved", inventoryReservedEvent.orderID(), inventoryReservedEvent);
...
kafkaTemplate.send("inventory-rejected", inventoryRejectedEvent.orderID(), inventoryRejectedEvent);

Как вы можете видеть, мы добавлили в качестве ключа номер нашего заказа. Это нужно для того, чтобы гарантировать, что события, связанные с одним заказом, в рамках одной партиции, будут потребляться последовательно (ведь Kafka гарантирует последовательную обработку в рамках одной партиции).

Для интереса давайте засунем строчку с отправкой сообщения в топик inventory-rejected в цикл for и прокрутим его 2000 раз :)

Но сначала очистим все сообщения в inventory-rejected с помощью интерфейса Kafka-UI. Затем отошлём POST-запрос к order-service с тем же самым JSON в теле.

В Kafka-UI мы наблюдаем ожидаемое поведение:

Все сообщения с одинаковым ключом в одной партиции
Все сообщения с одинаковым ключом в одной партиции

Прикрепление таймстампа к сообщению

Также существует перегрузка для того, чтобы прикрепить ещё и таймстамп:

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

О типе возвращаемого значения мы поговорим ниже.

У вас после просмотра этой сигнатуры, скорее всего, возник вопрос, зачем нужен параметр partition. Он нужен для того, чтоб вы вручную выбрали партицию, куда будет отправлено сообщение. При этом ключ не будет на это влиять. Но если поставить null, то партиция будет определяться ключом.

Давайте отправим сообщение с таймстампом.

Для начала оговорим, что Kafka работает с UNIX Timestamp (число миллисекунд с 1970-01-01 UTC).

Давайте отправим в качестве таймстампа 1 января 2025 года 00:00. Для этого напишем следующий код:

kafkaTemplate.send("inventory-rejected", null, LocalDateTime.of(2025, Month.JANUARY, 1, 0, 0).atZone(ZoneOffset.UTC).toInstant().toEpochMilli(),inventoryRejectedEvent.orderID(), inventoryRejectedEvent);

Теперь снова отправим наш PUT-запрос с 100 смартфонами, перед этим убрав цикл for. Откроем Kafka-UI и увидим следующее:

Видим указанный нами ранее таймстамп
Видим указанный нами ранее таймстамп

У меня Timestamp смещён, так как нахожусь в часовом поясе UTC+3.

Прикрепление хедеров к сообщению

Далее мы научимся отправлять хедеры в нашем сообщении. Это очень полезно при борьбе с дубликатами. Об этом мы поговорим в следующей статье.

Для того, чтобы это сделать, нам потребуется несколько другой способ отправки. В нашем случае это будет выглядеть как-то так:

ProducerRecord<String, Object> producerRecord = new ProducerRecord<>("inventory-rejected", inventoryRejectedEvent.orderID(), inventoryRejectedEvent);
producerRecord.headers().add("someHeader", String.valueOf(new Random().nextLong()).getBytes());
kafkaTemplate.send(producerRecord);

Мы используем специальный тип ProducerRecord. Его использование позволяет нам гибче настраивать сообщения. В его generics параметрах мы указываем семантически то же, что и в KafkaTemplate. В конструктор передаём название топика, ключ, экземпляр класса, который будет сериализован в JSON и передан в Kafka. Конструктор у этого класса перегружен. При желании можете посмотреть на все его варианты.

Мы добавим Header с названием someHeader (оригинально, да?). А в значение просто запишем рандомное число в виде строки, которую переведём в массив байт (это требует сигнатура метода).

Ну и в конце вызовем известный send.

Отсылаем наш избитый пример с 100 смартфонами, открываем Kafka-UI, открываем наше сообщение и смотрим хедеры. Мы видим следующую ситуацию:

{
	"someHeader": "7682405900824975627",
	"__TypeId__": "io.mitochondria.inventory.event.InventoryRejectedEvent"
}

О return-type метода send

Вы можете видеть, что возвращаемое значение у метода sendCompletableFuture. Это потому, что операция отправки сообщения — асинхронная. А этот тип по сути своей — обёртка над асинхронной операцией. При вызове метода основной поток не блокируется. Он моментально возвращет CompletableFuture и продолжает своё выполнение. А сама отправка выполняется в фоновых потоках. Давате попрактикуемся вешать коллбэки на наш результат.

Сложную логику писать не будем. Ограничимся следующим примером:

private static final Logger logger = LoggerFactory.getLogger(InventoryService.class);
...
CompletableFuture<SendResult<String, Object>> future =
                kafkaTemplate.send("inventory-rejected", inventoryRejectedEvent.orderID(), inventoryRejectedEvent);

            future.whenComplete((result, throwable) -> {
                if (throwable != null) {
                    logger.info("Error sending inventory rejected event in thread {}", Thread.currentThread().getName());
                }
                else {
                    logger.info("Success sending inventory rejected event in thread {}", Thread.currentThread().getName());
                }
            });

Если вы это запустите и проверите логи, то скорее всего у вас будет что-то такое:

Success sending inventory rejected event in thread kafka-producer-network-thread | inventory-service-producer-1

Наш коллбэк выполнился в так называемом фоновом Kafka I/O потоке.

Подведение итогов

Если вы дочитали до этого момента и выполнили все действия по ходу статьи, то у вас есть полноценная Event-Driven микросервисная платформа, с чем я вас поздравляю :)

Теперь вы обладаете базовыми навыками работы с Kafka из Spring Boot приложения. В этот раз на этих навыках мы и остановимся.

Для вашего удобства я разместил платформу на GitHub.

Следующая статья будет прямым продолжением этой. Там мы будем использовать платформу, которая получилась в ходе этой статьи. Мы познакомимся с некоторыми более продвинутыми концепциями, глубже рассмотрим процесс передачи сообщения в Kafka, посмотрим на возможные проблемы. Будет много интересного!

Комментарии (2)


  1. kmatveev
    23.10.2025 21:12

    Большой прогресс по сравнению с предыдущей статьёй: на этот раз всё написано самостоятельно. Вопрос с намёком: а что более осмысленное, чем строка "Key", имело бы смысл в этом конкретном сервисе использовать в качестве ключа?


    1. Mitochondria Автор
      23.10.2025 21:12

      Рад слышать, спасибо!

      Я планирую обновить достаточно сильно эту статью. Выкладывал вечером, подуставший :)

      Добавлю новый сервис, исправлю некоторые архитектурные косяки. И вставлю осмысленный ключ в примере.