kafka
kafka

Рад вас видеть, программисты-любители распределённых систем!

Мы продолжаем работу с Kafka на практике. В этот раз мы остановимся на обсуждении гарантий доставки на практике и настройке идемпотентности.

Будем использовать микросервисную платформу, полученную в предыдущей статье.

В этой статье мы:

  1. Перейдём с In-Memory хранилища в inventory-service на PostgreSQL

  2. Обсудим гарантии доставки на практике

  3. Разберём Outbox-Pattern

  4. Настроим идемпотентность продюсера и консьюмера

Будет интересно!

Переход inventory-service на PostgreSQL

Сначала мы перейдём с In-Memory хранилища на настоящую базу данных. Сделаем это для того, чтобы адекватно работать с сегодняшними задачами и паттернами, так как пока писал статью, понял, что In-Memory уже не подойдёт под наши задачи.

Использовать будем PostgreSQL. Так что добавьте в docker-compose.yaml следующее:

postgres-inventory:
    # Будем использовать PostgreSQL
    image: postgres:14
    # Имя контейнера
    container_name: postgres-inventory
    # hostname контейнера
    hostname: postgres-inventory
    # Пробрасываем порты
    ports:
      - "5432:5432"
    environment:
      # Юзер
      POSTGRES_USER: postgres
      # Пароль
      POSTGRES_PASSWORD: postgres
      # Стартовая база данных
      POSTGRES_DB: inventory_db

Заметьте, что я ни в брокерах, ни в бд не прокидывал volumes для простоты. При желании вы можете это сделать.

Также нужно прыгнуть в inventory-service, и в pom-файл добавить зависимости:

<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>postgresql</artifactId>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

Далее добавим следующие строки в application.properties:

# Database connection properties
spring.datasource.url=jdbc:postgresql://localhost:5432/inventory_db
spring.datasource.username=postgres
spring.datasource.password=postgres

Теперь создадим таблицу для наших остатков товаров:

CREATE TABLE inventory (
    id BIGSERIAL PRIMARY KEY,
    product_name VARCHAR(256) NOT NULL UNIQUE,
    quantity INT NOT NULL CHECK (quantity >= 0)
);

Тут же добавим товары, которые были ранее в нашей Map:

INSERT INTO inventory (product_name, quantity) VALUES
('Smartphone', 5),
('Tablet', 10),
('Desktop', 6);

В пакете model добавим сущность в виде гномика, которая будет маппиться на таблицу.

@Entity
@Table(name = "inventory")
public class Inventory {
    @Id
    @Column(name = "id")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "product_name", nullable = false, unique = true, length = 256)
    private String productName;

    @Column(name = "quantity", nullable = false)
    private Integer quantity;

    public Inventory(String productName, Integer quantity) {
        this.productName = productName;
        this.quantity = quantity;
    }

    // empty constructor, getters, setters
}

Изменим код репозитоия:

public interface InventoryRepository extends JpaRepository<Inventory, Long> {
    @Modifying
    @Query("UPDATE Inventory i SET i.quantity = i.quantity - :quantity WHERE i.productName = :productName AND i.quantity >= :quantity")
    int deductStock(@Param("productName") String productName, @Param("quantity") Integer quantity);
}

Внесём изменения в сервис:

@KafkaListener(topics = "order-placed")
    public void reserveInventory(OrderPlacedEvent orderPlacedEvent) {
        int count = inventoryRepository.deductStock(orderPlacedEvent.productName(), orderPlacedEvent.quantity());

        if (count > 0) {
            InventoryReservedEvent inventoryReservedEvent = new InventoryReservedEvent(
                orderPlacedEvent.orderId(),
                orderPlacedEvent.email()
            );

            kafkaTemplate.send("inventory-reserved", inventoryReservedEvent.orderID(), inventoryReservedEvent);
        }
        else {
            InventoryRejectedEvent inventoryRejectedEvent = new InventoryRejectedEvent(
                orderPlacedEvent.orderId(),
                orderPlacedEvent.email()
            );

            CompletableFuture<SendResult<String, Object>> future =
                kafkaTemplate.send("inventory-rejected", inventoryRejectedEvent.orderID(), inventoryRejectedEvent);

            future.whenComplete((result, throwable) -> {
                if (throwable != null) {
                    logger.info("Error sending inventory rejected event in thread {}", Thread.currentThread().getName());
                }
                else {
                    logger.info("Success sending inventory rejected event in thread {}", Thread.currentThread().getName());
                }
            });
        }
    }

Гарантии доставки на практике

В теоретической статье мы говорили о гарантиях доставки, но не связывали эти знания с практикой. Сейчас мы это исправим.

Параметр acks

Надеюсь, вы помните такой параметр как acks. Он отвечает за количество подтверждений, необходимых продюсеру для продолжения работы.

Чтобы задать этому параметру значение из продюсера на Spring Boot, необходимо в его application.properties файл добавить следующую строку:

spring.kafka.producer.acks=0 # или 1, или all

Вот, собсвенно, и всё. Этим мы добиваемся гарантий at-most-once и at-least-once.

Про гарантию exactly-once

Вам, скорее всего, доводилось слышать словосочетание exactly-once delivery. Трактовать его буквально не стоит: в контексте распределённых систем отправка сообщения продюсером и обработка консьюмером ровно один раз невозможна даже теоретически.

Эта проблема известна как "Two Generals' Problem" (Проблема двух генералов). Она утверждает, что две стороны не могут гарантированно договориться о согласованном состоянии через ненадёжный канал связи.

Поэтому, когда говорят exactly-once delivery, на практике подразумевают at-least-once delivery + идемпотентность на обеих сторонах. Именно этим в данной статье мы и и будем заниматься.

Идемпотентность продюсера

Ситуация с дубликатами в партиции

Для начала мы рассмотрим ситуацию:

  1. Продюсер отправляет сообщение в Kafka

  2. Брокер получает его и отправляет ack продюсеру

  3. Из-за сетевых проблем ack уходит в небытие

  4. Продюсер отправляет сообщение заново

У нас появились дубликаты в одной партиции. Неприятно, да?

Вот поэтому и была придумана идемпотентность продюсера. Она гарантирует, что дубликаты просто-напросто не запишутся в партицию снова.

Включение идемпотентности

Идемпотентность включается с помощью одной строки в application.properties продюсера:

spring.kafka.producer.properties.enable.idempotence=true

Как это работает внутри

После указания значения true, брокер описывает каждое соединение продюсера парой (ProducerId, ProducerEpoch).

  • ProducerId уникально идентифицирует конкретного продюсера

  • ProducerEpoch инкрементируется при каждом “возрождении” продюсера и различает его подключения.

Также продюсер начинает нумеровать каждое сообщение. Это называется sequence number.

Всю эту информацию отслеживает брокер для того, чтобы обеспечить идемпотентность.

Можно подумать, что для идемпотентности достаточно пары (ProducerId, sequence number). Однако необходимо понимать важность ProducerEpoch: он защищает от "зомби-продюсеров".

Пример с "зомби-продюсером"

Представим ситуацию:

  1. Продюсер работает, всё хорошо

  2. Внезапно полетел

  3. Мы его перезапускаем

Казалось бы, что может пойти не так? Ответ: примерно всё.

При перезапуске старый продюсер может не "умереть" сразу. Более того, он может внезапно "ожить" и начать отправлять сообщения. Тогда у нас в некоторый отрезок времени могут активно работать два продюсера. Соответственно, в нашей системе будет хаос, так как новый продюсер начнёт с sequence number = 1, а старый продюсер уже отослал сообщения с sequence number от 1 до какого-то числа n. Соответственно, новый продюсер продолжительное время вообще не сможет ничего отправлять.

ProducerEpoch помогает отсекать сообщения старых продюсеров и определять подключение нового продюсера.

Параметр transactional.id

Брокер различает продюсеров по значению transactional.id (в Spring — свойство spring.kafka.producer.properties.transactional.id). Именно через него он понимает, что это тот же продюсер после перезапуска. Если его не указать, то при перезапуске того же сервиса-продюсера, Kafka может дать ему другой ProducerId. Это крайне нежелательная ситуация, так как начнут появляться дубликаты.

Spring Boot Kafka — по сути обёртка над нативным KafkaProducer, который создаётся на лету. Жизненный цикл продюсера управляется самим Spring Boot.
Если задан transactional.id, при запуске приложения Spring вызывает у нативного продюсера метод initTransactions(), что заставляет брокер выдать нашему продюсеру ProducerId и ProducerEpoch. Если же transactional.id не задан, то брокер выдаст случайный ProducerId и ProducerEpoch = 0.

Важные параметры, влияющие на идемпотентность

По умолчанию, начиная со Spring Boot 2.5, параметр enable.idempotence автоматически устанавливается в true при использовании transactional.id.

Также существуют некоторые параметры, изменение которых может отключить идемпотентность. Рассмотрим их подробнее:

  1. spring.kafka.producer.acks
    Для того, чтобы идемпотентность работала, необходимо установить acks=all. Это нужно для того, чтобы брокер смог подтвердить сохранение сообщения на всех репликах. Иначе идемпотентность теряет смысл.

  2. spring.kafka.producer.retries
    Этот параметр отвечает за то, сколько раз продюсер будет пытаться повторно отправить сообщение, если не получает ack. Значение должно быть строго больше нуля. Это необходимо, чтобы Kafka могла повторно отправить сообщение, если брокер не прислал подтверждение (ack).

  3. spring.kafka.producer.properties.max.in.flight.requests.per.connection
    Этот параметр отвечает за то, сколько сообщений может быть максимально отправлено продюсером до получения ack по одному из них. Значение должно быть не больше 5. Если увеличить, в партициях может возникнуть серьёзный беспорядок.

Настройка идемпотентности в наших сервисах

Давайте настроим идемпотентность у наших order-service и inventory-service.

Прыгаем в application.properties файл order-service и добавляем следующие строки:

spring.kafka.producer.acks=all
spring.kafka.producer.retries=2147483647
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.transaction-id-prefix=order-service-
spring.kafka.producer.properties.enable.idempotence=true

Заметим, что вместо transactional.id целиком, мы используем указание префикса. Это нужно для того, чтобы у разных инстансов одного сервиса был указан непосредственно этот самый префикс, а суффикс укажет Spring Boot автоматически. При этом этот суффикс при перезапусках одного инстанса не меняется.

Аналогично с inventory-service:

spring.kafka.producer.acks=all
spring.kafka.producer.retries=2147483647
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.transaction-id-prefix=inventory-service-
spring.kafka.producer.properties.enable.idempotence=true

Идемпотентность консьюмера (inventory-service)

Может произойти такая ситуация, что консьюмер обработает сообщение дважды.

Ситуация с двойной обработкой

Рассмотрим на примере нашей платформы. Интереснее всего будет выбрать для этих целей inventory-service.

Представим, что inventory-service прочитал сообщение, успел записать данные в базу, отправить своё сообщение в топик, но упал перед тем, как сохранить (закоммитить) offset. То есть он при перезапуске снова прочитает то же самое сообщение. Как следствие, отправит снова это же сообщение в свой топик, даже если этот самый продюсер идемпотентный (он выдаст сообщению новый номер, так как фактически сообщение будет другим, просто с теми же самыми данными).

Таблица обработанных сообщений

Для таких ситуаций достаточно часто делают следующий трюк: создают таблицу в базе данных примерно следующего вида:

CREATE TABLE processed_order_id (
    order_id VARCHAR(36) PRIMARY KEY,
    processed_at TIMESTAMP DEFAULT NOW()
);

Прелесть этого в том, что на столбце, который primary key, по умолчанию стоит ограничение целостности unique. А это ограничение достигается с помощью уникального B-Tree индекса. Для нас это значит, что нельзя вставить конкретное значение более одного раза. То, что нужно.

Также немаловажно отметить, что поиск будет происходить за O(logn). В высоконагруженных системах это может стать камнем преткновения и стоит лучше использовать что-то типа Redis, где поиск за O(1). Но там своё веселье с консистентностью данных.

Давайте создадим данную выше таблицу. Также создадим сущность для маппинга на эту таблицу:

@Entity
@Table(name = "processed_order_id")
public class ProcessedOrderId {
    @Id
    @Column(name = "order_id", length = 36)
    private String orderId;

    @Column(name = "processed_at")
    private LocalDateTime processedAt;

    public ProcessedOrderId(String orderId) {
        this.orderId = orderId;
        this.processedAt = LocalDateTime.now();
    }
  
    // empty constructor, getters, setters
}

Создадим простейший репозиторий:

public interface ProcessedOrderIdRepository extends JpaRepository<ProcessedOrderId, String> {
    
}

Дедубликация

Будем использовать всё это для защиты от обработки дубликатов:

@KafkaListener(topics = "order-placed")
    public void reserveInventory(OrderPlacedEvent orderPlacedEvent) {
        try {
            processedOrderIdRepository.save(new ProcessedOrderId(
                orderPlacedEvent.orderId()
            ));
        } catch (DataIntegrityViolationException e) {
            logger.info("Order {} already processed", orderPlacedEvent.orderId());
            return;
        }

        int count = inventoryRepository.deductStock(orderPlacedEvent.productName(), orderPlacedEvent.quantity());

        if (count > 0) {
            InventoryReservedEvent inventoryReservedEvent = new InventoryReservedEvent(
                orderPlacedEvent.orderId(),
                orderPlacedEvent.email()
            );

            kafkaTemplate.send("inventory-reserved", inventoryReservedEvent.orderID(), inventoryReservedEvent);
        }
        else {
            InventoryRejectedEvent inventoryRejectedEvent = new InventoryRejectedEvent(
                orderPlacedEvent.orderId(),
                orderPlacedEvent.email()
            );

            CompletableFuture<SendResult<String, Object>> future =
                kafkaTemplate.send("inventory-rejected", inventoryRejectedEvent.orderID(), inventoryRejectedEvent);

            future.whenComplete((result, throwable) -> {
                if (throwable != null) {
                    logger.info("Error sending inventory rejected event in thread {}", Thread.currentThread().getName());
                }
                else {
                    logger.info("Success sending inventory rejected event in thread {}", Thread.currentThread().getName());
                }
            });
        }
    }

Мы произвели так называемую дедубликацию в нашем консьюмере.

Однако мы сталкиваемся с критическими проблемами при падении консьюмера. Если, скажем, консьюмер упадёт сразу после пометки сообщения как прочитанного, то при перезапуске мы его не обработаем.

Вы можете предложить поставить код для пометки сообщения как обработанного в конец, а вместо try-catch конструкции использовать поиск по айдишнику сообщения, чтобы проверить, обрабатывали ли мы его ранее. Если вы думаете, что вам это поможет, то, к сожалению, это не так. Вы упрётесь в новую проблему (подумайте, какую). Вообще здесь существует множество способов переставлять инструкции, как-то изменять код. Но здесь есть фундаментальная ошибка – в методе мы работаем с бд (выполняем бизнес-логику) и с Kafka. В таком методе, как бы вы его не пытались улучшить, всегда будут возникать проблемы.

Именно для такого случая существует Outbox-Pattern.

Outbox-Pattern

Суть паттерна — разбиение сохранения в базу и в брокер на два шага. Сначала мы выполняем действия с базой данных: фиксируем сообщение как обработанное, выполняем бизнес-логику, сохраняем данные о сообщении в специальную таблицу outbox. Затем в отдельном методе периодически опрашиваем базу и если находим неотправленные сообщения, то отправляем их в Kafka.

Сначала создадим необходимую таблицу:

CREATE TABLE outbox_event (
    key VARCHAR(36) PRIMARY KEY,
    topic VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    sent BOOLEAN DEFAULT FALSE
);

CREATE INDEX idx_outbox_event_sent ON outbox_event(sent, created_at);

Я надеюсь, что сама таблица вас не смутила. Но по индексу могут быть вопросы. Давайте обговорим основные моменты.

Индекс на столбец sent достаточно очевиден. Мы хотим быстро находить неотправленные сообщения. Но мы не делаем индекс только на sent. Также ещё и на created_at. Это нужно для того, чтобы отправлять сначала самые старые сообщения. Это необходимо для соблюдения порядка.

Давайте создавать сущность:

@Entity
@Table(name = "outbox_event")
public class OutboxEvent {
    @Id
    @Column(name = "key", length = 36)
    private String key;
    @Column(name = "topic", nullable = false)
    private String topic;
    @Column(name = "payload", nullable = false)
    private String payload;
    @Column(name = "created_at")
    private LocalDateTime createdAt;
    @Column(name = "sent")
    private Boolean sent;

    public OutboxEvent(String key, String topic, String payload) {
        this.key = key;
        this.topic = topic;
        this.payload = payload;
        this.createdAt = LocalDateTime.now();
        this.sent = false;
    }

    // empty constructor, getters, setters this.sent = sent;
}

Ну и репозиторий, конечно:

public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
    
}

Начинаем переписывать логику нашего сервиса:

@Transactional
@KafkaListener(topics = "order-placed")
public void reserveInventory(OrderPlacedEvent orderPlacedEvent) {
    try {
        processedOrderIdRepository.save(new ProcessedOrderId(
            orderPlacedEvent.orderId()
        ));
    } catch (DataIntegrityViolationException e) {
        logger.info("Order {} already processed", orderPlacedEvent.orderId());
        return;
    }

    int count = inventoryRepository.deductStock(orderPlacedEvent.productName(), orderPlacedEvent.quantity());
    String topic = (count > 0) ? "inventory-reserved" : "inventory-rejected";
    Object event = (count > 0)
        ? new InventoryReservedEvent(orderPlacedEvent.orderId(), orderPlacedEvent.email())
        : new InventoryRejectedEvent(orderPlacedEvent.orderId(), orderPlacedEvent.email());
    String json;

    try {
        json = objectMapper.writeValueAsString(event);
    } catch (Exception e) {
        throw new RuntimeException("Serialization failed for order: " + orderPlacedEvent.orderId(), e);
    }

    OutboxEvent outboxEvent = new OutboxEvent(
        orderPlacedEvent.orderId(),
        topic,
        json
    );

    outboxEventRepository.save(outboxEvent);
}

Примерно как-то так. Так как работа происходит только с базой данных, мы можем повесить аннотацию Transactional. Этим мы гарантируем атомарность метода относительно базы данных.

Теперь нам нужно в отдельном потоке запустить метод, который бы периодически запрашивал данные о сообщениях и отправлял в Kafka те сообщения, которые ещё не были отправлены.

Для начала добавим новый метод в наш репозиторий:

public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
    List<OutboxEvent> findBySentFalseOrderByCreatedAtAsc(Pageable pageable);
}

В качестве параметра метода добавим параметр Pageable для того, чтобы выбирать не всё сразу, а ограничивать выборку. Также сортируем по возрастанию, несмотря на наличие индекса. Делаем это "на всякий случай", ибо SQL строго не гарантирует, что даже с индексом данные будут выведены в ожидаемом порядке без явного указания ORDER BY.

Теперь в пакете scheduler создадим класс:

@Component
public class InventoryRelayerScheduler {
    private final Logger logger = LoggerFactory.getLogger(InventoryRelayerScheduler.class);
    private final OutboxEventRepository outboxEventRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    public InventoryRelayerScheduler(
        OutboxEventRepository outboxEventRepository,
        KafkaTemplate<String, String> kafkaTemplate
    ) {
        this.outboxEventRepository = outboxEventRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 5000)
    public void relayOutboxEvents() {
        List<OutboxEvent> outboxEvents = outboxEventRepository.findBySentFalseOrderByCreatedAtAsc(PageRequest.of(0, 100));

        for (OutboxEvent outboxEvent : outboxEvents) {
            try {
                kafkaTemplate.send(outboxEvent.getTopic(), outboxEvent.getKey(), outboxEvent.getPayload()).get();
                outboxEvent.setSent(true);
                outboxEventRepository.save(outboxEvent);
            }
            catch (Exception e) {
                logger.error("Failed to relay outbox event: {}", outboxEvent.getKey(), e);
            }
        }
    }
}

Особо пытливый читатель может смутиться коду внутри релеера. Действительно, ранее я говорил, что работать в одном методе с бд и с Kafka – антипаттерн. Но там я отмечал, что плохо работать с бд, когда эта работа связана с бизнес-логикой. Здесь работы с бизнес-логикой нет. Это чисто вспомогательный метод. Здесь всё хорошо. Да, атомарности нет. Но здесь это не критично. Мы добиваемся гарантии at-least-once. То есть сообщение точно будет отправлено в Kafka. Это куда лучше, чем было до применения паттерна, так как нам необходимо просто сделать консьюмера, который принимает сообщения от inventory-service, идемпотентным (если это необходимо, конечно).

Также вы можете заметить, что в коде релеера вызывается метод get() у объекта, который вернул метод send(). Это очень важно. Метод send() работает асинхронно и сразу возвращает CompletableFuture, не дожидаясь реальной отправки в Kafka. Если мы не вызовем get(), то пометим сообщение как отправленное до того, как оно действительно попадет в брокер. Если после этого отправка упадет с ошибкой, мы потеряем сообщение — в базе оно уже помечено как отправленное, и релеер его больше не подхватит.

Также добавим аннотацию EnableScheduling в главный класс приложения:

@SpringBootApplication
@EnableScheduling
public class InventoryServiceApplication {
	public static void main(String[] args) {
		SpringApplication.run(InventoryServiceApplication.class, args);
	}
}

Вот и вся реализация паттерна. Возьмите себе на вооружение. Полезная штука в Event-Driven Architecture.

Идемпотентность консьюмера (notification-service)

В данном консьюмере нам необходимо настроить только обработку дубликатов, так как необходимости в более сложных телодвижениях (таких, как outbox-pattern) нет. Для достижения этой цели мы поднимем новый контейнер с PostgreSQL для notifiaction-service и произведём действия, очень похожие на те, что мы производили в inventory-service. Уверен, что вы справитесь самостоятельно. В любом случае в конце я прикреплю ссылку на GitHub репозиторий с кодом. Поэтому, если что, сможете свериться.

А что по analytics-service?

analytics-service мы трогать вообще не будем. Это именно тот случай, когда идемпотентная обработка не нужна. Небольшое количество дубликатов никак не исказят общую картину. Добавление идемпотентности для этого сервиса нецелесообразно.

Почему не было рассказано про коммиты оффсетов?

Наш код в некоторых местах недостаточно надёжный из-за того, что мы используем так называемый автокоммит. Если вы не знаете, что это, не волнуйтесь. Мы обязательно разберём это в следующей статье.

Я изначально хотел рассказать об этом здесь, но эта тема объёмная и сложная. Мы и так разобрались с далеко не самыми простыми вещами в этой статье. Добавление сюда рассказа про коммиты оффсетов — перебор.

Подведение итогов

Если вы дочитали до этого момента и осознали всю информацию выше, то поздравляю вас, — вы стали на несколько порядков лучше в области распределённых систем :)

Теперь вы знаете, что со 100% вероятностью добиться гарантии exactly-once невозможно. Также теперь вы знаете, как настроить идемпотентность, как реализовать очень полезный outbox-pattern.

Репозиторий с кодом, полученным в ходе статьи, разместил на GitHub.

В следующей статье мы подробно посмотрим на коммиты оффсетов.

Комментарии (0)