Введение
Рад всех видеть! И с наступающим!
Сегодня мы подготовим наши Kafka-консьюмеры к Новому году.
В этой статье мы подробно поработаем с оффсетами. Эта тема объёмная и достаточно сложная, но для постоения надёжных систем её необходимо понимать.
Мы будем использовать платформу, которую получили в предыдущей статье.
Давайте начинать.
Как консьюмер получает сообщения?
Для начала полезно понимать, как вообще происходит получение сообщений консьюмером:
Spring Boot вызывает у нативного Kafka-консьюмера, которого он создал на лету в момент запуска приложения, метод
poll(). Этот метод вытягивает из Kafka батч сообщений.Spring Boot перебирает этот батч и вызывает для каждого сообщения наш
KafkaListenerметод.
Это нам пригодится для понимания темы оффсетов.
Способы коммита
Существует 2 способа коммитить оффсет — автоматически и вручную.
-
Автокоммит
Как вы, скорее всего, догадались, коммит происходит автоматически. Это дефолтный способ.
-
Ручной коммит
Этот способ коммита позволяет нам чётко контролировать обработанные сообщения.
Автокоммит
Принцип его работы достаточно простой. Есть поток, в котором запущен опрос Kafka (получение батчей). Также есть поток, который коммитит оффсет согласно некоторому фиксированному интервалу.
Для лучшего понимания посмотрим на эту историю на примере. Условимся, что интервал равен 5 миллисекундам:
Консьюмер получает с помощью
poll()батч сообщений.Прошло 5 миллисекунд с последнего коммита
Коммитится оффсет со значением оффсета последнего сообщения из батча + 1
Это довольно опасный способ.
Давайте представим ситуацию, демонстрирующую опасность этого способа:
poll()вернул батч из 100 сообщений.Было обработано 70 сообщений, после чего произошёл автокоммит.
Если приложение упадёт, скажем, на 85 сообщении, то мы потеряем сообщения с 85 до 100. Это произошло потому, что коммит уже произошёл и при перезапуске мы начнём читать с оффсета 101.
Вообще, использование автокоммита в большинстве случаев нецелесообразно.
Наверняка у вас возникнет вопрос: "Как можно было сделать такой ненадёжный способ по дефолту и зачем это чудо существует?".
Всё дело в истоках Kafka. Она была создана в LinkedIn для логов и метрик. Соответственно, потеря нескольких сообщений была некритичной. Вот создатели и решили использовать такой способ для простоты. В новых версиях он остался по дефолту из-за обратной совместимости. Разработчики сами многократно признавали, что зря сделали его дефолтом. Но имеем, что имеем. Главное, что это не единственный способ :)
В Spring Boot управление всей этой историей происходит как-то так:
spring.kafka.consumer.enable-auto-commit # автокоммит (включён или выключен)
spring.kafka.consumer.auto-commit-interval # таймер автокоммита (время в миллисекундах)
Короче говоря, автокоммит — некое историческое наследие, опасное в production. Он полагается на таймер, независимо от того, обработали ли вы сообщения. Хотя справедливости ради стоит сказать, что, например, в сервисах по сбору метрик можно его и оставить.
Ручной коммит
Этот способ уже более надёжный, в отличие от прошлого.
Стоит сразу отметить, что мы можем выбрать один из четырёх вариантов ручного коммита. При этом два из них не совсем ручные. Давайте рассмотрим их по порядку:
-
BATCH
Работает это как-то так:poll() -> [батч сообщений] -> обработали ВСЕ -> Spring автоматически коммититКак вы видите, штука на первый взгляд удобная.
Очевидный недостаток этого метода состоит в том, что если мы упадём ближе к концу батча, то придётся перечитывать весь батч полностью. Как следствие, будет обработано много дубликатов.
Казалось бы, здесь больше не о чем говорить. Но есть один критически важный момент. Для того, чтобы его обсудить, вспомним код нашего KafkaListener'а:@Transactional @KafkaListener(topics = "order-placed") public void reserveInventory(OrderPlacedEvent orderPlacedEvent) { try { processedOrderIdRepository.save(new ProcessedOrderId( orderPlacedEvent.orderId() )); } catch (DataIntegrityViolationException e) { logger.info("Order {} already processed", orderPlacedEvent.orderId()); return; } int count = inventoryRepository.deductStock(orderPlacedEvent.productName(), orderPlacedEvent.quantity()); String topic = (count > 0) ? "inventory-reserved" : "inventory-rejected"; Object event = (count > 0) ? new InventoryReservedEvent(orderPlacedEvent.orderId(), orderPlacedEvent.email()) : new InventoryRejectedEvent(orderPlacedEvent.orderId(), orderPlacedEvent.email()); String json; try { json = objectMapper.writeValueAsString(event); } catch (Exception e) { throw new RuntimeException("Serialization failed for order: " + orderPlacedEvent.orderId(), e); } OutboxEvent outboxEvent = new OutboxEvent( orderPlacedEvent.orderId(), topic, json ); outboxEventRepository.save(outboxEvent); }Всё дело в транзакционности. Spring Boot сначала делает коммит, а только потом закрывает транзакцию бд. Вообще сочетания аннотаций
KafkaListenerиTransactionalдовольно опасное.Давайте представим ситуацию:
Мы обработали почти все сообщения в батче (за исключением последнего). Затем начинается обработка последнего сообщения. Всё прошло хорошо. После выхода из метода Spring Boot сделает коммит оффсета. Но затем при коммите транзакции бд происходит исключение. Соответственно, всё откатится. Но сообщение считается обработанным.
Вам может показаться, что эта проблема высосана из пальца, и что если мы уж вышли из метода, то никаких проблем далее возникнуть не может. Но нет. Исключение может вылететь в момент непосредственного коммита. Например, при коммите база данных может обнаружить deadlock. Также возможны сценарии переполненного диска и возникновения
OutOfMemoryError. Короче говоря, играться с этим опасно.Давайте попробуем решить эту проблему. Для начала проанализируем, откуда вообще растут ноги у этой проблемы.
Корень проблемы — порядок коммита оффсета и конца транзакции. Так давайте попробуем этот порядок поменять. Для этого предлагаю вынести всю логику из нашего listener'а в отдельный транзакционный метод. Это нам позволит вызывать этот самый метод из listener'а. Преимущество в том, что мы выйдем из метода только тогда, когда транзакция будет успешно закрыта коммитом. То есть порядок теперь — конец транзакции (коммит) -> коммит оффсета.
Для этого нам придётся создать ещё один бин. Это делается потому, что нельзя вызывать транзакционный метод изнутри того же класса. Это связано с тем, что Spring Boot создаёт прокси для классов с транзакционными методами. Но если мы попытаемся вызвать такой метод из метода того же класса, то будет вызван метод этого класса, а не бина вокруг него, так как метод вызывается на экземпляре this.
Так вот. Давайте же создадим в пакете
serviceновый класс:@Service public class InventoryProcessor { private final static Logger logger = LoggerFactory.getLogger(InventoryProcessor.class); private final InventoryRepository inventoryRepository; private final ProcessedOrderIdRepository processedOrderIdRepository; private final OutboxEventRepository outboxEventRepository; private final ObjectMapper objectMapper = new ObjectMapper(); public InventoryProcessor( InventoryRepository inventoryRepository, ProcessedOrderIdRepository processedOrderIdRepository, OutboxEventRepository outboxEventRepository ) { this.inventoryRepository = inventoryRepository; this.processedOrderIdRepository = processedOrderIdRepository; this.outboxEventRepository = outboxEventRepository; } @Transactional public void processOrder(OrderPlacedEvent orderPlacedEvent) { try { processedOrderIdRepository.save(new ProcessedOrderId( orderPlacedEvent.orderId() )); } catch (DataIntegrityViolationException e) { logger.info("Order {} already processed", orderPlacedEvent.orderId()); return; } int count = inventoryRepository.deductStock(orderPlacedEvent.productName(), orderPlacedEvent.quantity()); String topic = (count > 0) ? "inventory-reserved" : "inventory-rejected"; Object event = (count > 0) ? new InventoryReservedEvent(orderPlacedEvent.orderId(), orderPlacedEvent.email()) : new InventoryRejectedEvent(orderPlacedEvent.orderId(), orderPlacedEvent.email()); String json; try { json = objectMapper.writeValueAsString(event); } catch (Exception e) { throw new RuntimeException("Serialization failed for order: " + orderPlacedEvent.orderId(), e); } OutboxEvent outboxEvent = new OutboxEvent( orderPlacedEvent.orderId(), topic, json ); outboxEventRepository.save(outboxEvent); } }Здесь ничего особенного. Просто вынесли наш метод из основного сервиса сюда.
Также изменим код в нашем основном сервисном классе:
@Service public class InventoryService { private final InventoryProcessor inventoryProcessor; public InventoryService(InventoryProcessor inventoryProcessor) { this.inventoryProcessor = inventoryProcessor; } @KafkaListener(topics = "order-placed") public void reserveInventory(OrderPlacedEvent orderPlacedEvent) { inventoryProcessor.processOrder(orderPlacedEvent); } }Вот теперь порядок. Такой подход позволяет нам коммитить только тогда, когда произошёл коммит транзакции в бд. Если метод выбросит исключение, Spring Boot коммит не сделает.
Вам, возможно, покажется этот способ костыльным. Я в некоторой степени соглашусь. Давайте рассмотрим альтернативный способ, для которого не нужно плодить лишние сущности. Тут нам понадобится бин
TransactionalTemplate. Давайте перепишем код нашего сервиса:@Service public class InventoryService { private static final Logger logger = LoggerFactory.getLogger(InventoryService.class); private final InventoryRepository inventoryRepository; private final ProcessedOrderIdRepository processedOrderIdRepository; private final OutboxEventRepository outboxEventRepository; private final TransactionTemplate transactionTemplate; private final ObjectMapper objectMapper; public InventoryService( InventoryRepository inventoryRepository, ProcessedOrderIdRepository processedOrderIdRepository, OutboxEventRepository outboxEventRepository, TransactionTemplate transactionTemplate, ObjectMapper objectMapper ) { this.inventoryRepository = inventoryRepository; this.processedOrderIdRepository = processedOrderIdRepository; this.outboxEventRepository = outboxEventRepository; this.transactionTemplate = transactionTemplate; this.objectMapper = objectMapper; } @KafkaListener(topics = "order-placed") public void reserveInventory(OrderPlacedEvent orderPlacedEvent) { transactionTemplate.executeWithoutResult(status -> { processOrderInTransaction(orderPlacedEvent); }); // Чётко контролируем то, что транзакция бд будет совершена до выхода из метода } private void processOrderInTransaction(OrderPlacedEvent orderPlacedEvent) { try { processedOrderIdRepository.save(new ProcessedOrderId( orderPlacedEvent.orderId() )); } catch (DataIntegrityViolationException e) { logger.info("Order {} already processed", orderPlacedEvent.orderId()); return; } int count = inventoryRepository.deductStock(orderPlacedEvent.productName(), orderPlacedEvent.quantity()); String topic = (count > 0) ? "inventory-reserved" : "inventory-rejected"; Object event = (count > 0) ? new InventoryReservedEvent(orderPlacedEvent.orderId(), orderPlacedEvent.email()) : new InventoryRejectedEvent(orderPlacedEvent.orderId(), orderPlacedEvent.email()); String json; try { json = objectMapper.writeValueAsString(event); } catch (Exception e) { throw new RuntimeException("Serialization failed for order: " + orderPlacedEvent.orderId(), e); } OutboxEvent outboxEvent = new OutboxEvent( orderPlacedEvent.orderId(), topic, json ); outboxEventRepository.save(outboxEvent); } }Этот способ и оставим, так как он чище.
Также вы можете заметить (или не заметить), что в коде теперь инжектится через конструктор бин
ObjectMapper. Я, честно говоря, сам не понимаю, почему я его создавал черезnewдля каждого класса. Но ничего, бывает. Дело в том, что экземпляр такого класса достаточно тяжёлый, и создавать его для каждого бина нецелесообразно. Рекомендую и вам везде инжектить этот класс вместо создания новых экземпляров. -
RECORD
poll() -> [батч сообщений] -> обработали 1 -> Spring автоматически коммитит -> обработали 2 -> ...Тут тоже всё за вас делает Spring, но в отличие от предыдущего варианта, здесь коммит происходит после каждого сообщения. Очевидно, что при использовании такого подхода будет много запросов к Kafka, следовательно, пострадает скорость.
Также здесь есть проблема при использовании транзакций бд. Вспомним предыдущий подход и проблему с последним сообщением. Так вот, здесь эта проблема имеет место для каждого сообщения.Изобретать велосипед не придётся. Всё решается так же, как и в прошлом режиме. Выбирайте один из двух способов (надеюсь, вы выберите второй).
-
MANUAL
poll() -> [батч сообщений] -> обработали 1 -> вызвали ack.acknowledge() -> ... -> обработали k -> вызвали ack.acknowledge() -> коммитЗдесь нужно остановиться поподробнее. Первое, что бросается в глаза — вызов незнакомого метода на незнакомом объекте. Давайте по порядку.
В случае с MANUAL подходом, нужно будет добавить новый параметр метода. Этот параметр — экземпляр класса
Acknowledgment, который Spring Boot передаст автоматически. С помощью методаack()этого класса и происходит "коммит". Слово коммит я взял в кавычки, потому что далеко не факт, что при MANUAL режиме после вызоваack()произойдёт непосредственный коммит. В данном режиме при вызове методаack()Spring Boot помечает сообщение как обработанное. Сами же коммиты происходят пачками. То есть если мы пометили как обработанные условные k сообщений (как в примере, приведённом в начале пояснения о MANUAL режиме), то только тогда произойдёт непосредственный коммит.
Наш код будет выглядеть как-то так:@KafkaListener(topics = "order-placed") public void reserveInventory( OrderPlacedEvent orderPlacedEvent, Acknowledgment ack ) { transactionTemplate.executeWithoutResult(status -> { processOrderInTransaction(orderPlacedEvent); }); ack.acknowledge(); }Тут в принципе нет ничего сложного и всё довольно понятно.
-
MANUAL_IMMEDIATE
poll() -> [батч сообщений] -> обработали 1 -> вызвали ack.acknowledge() -> коммит -> ... -> обработали k -> вызвали ack.acknowledge() -> коммитЛогика похожа на MANUAL режим. Но в отличие от этого режима, здесь коммит происходит не пачками, а непосредственно сразу после каждого вызова
ack.acknowledge().
Надеюсь, не запутал вас в объяснениях :)
Синхронный и асинхронный коммит
Мы можем коммитить оффсеты либо синхронно, либо асинхронно.
Давайте по порядку:
-
Синхронный коммит
Консьюмер отправляет запрос брокеру и ждёт ответ. Это дефолтная настройка в Spring Boot Kafka Consumer.
Плюс состоит в том, что мы точно знаем, прошёл коммит или нет. Если брокер упал или сеть моргнула, мы получим исключение и сможем его обработать (например, попробовать еще раз).
Но, очевидно, это тормозит работу. Пока мы ждем ответа от Kafka (пусть это и миллисекунды), консьюмер не обрабатывает новые сообщения. Пропускная способность падает.
-
Асинхронный коммит
Консьюмер отправляет запрос и сразу идет дальше, не дожидаясь подтверждения.
Плюсом является скорость. Это увеличивает пропускную способность.
Минус — мы сразу не знаем, удался ли коммит.
Также возможна следующая ситуация:
Мы отправили асинхронный коммит оффсета 10. Произошел сетевой лаг, запрос завис. Мы тут же обработали следующее сообщение и отправили асинхронный коммит оффсета 11. Этот запрос пролетел быстро и успешно записался в Кафку. И тут "проснулся" первый запрос (оффсет 10). То есть порядок перемешивается. Это грозит повторной обработкой данных.
К слову, Spring Boot по умолчанию использует для автокоммита асинхронный режим, а для ручных коммитов — синхронный.
Настраиваем коммиты оффсетов в inventory-service
Давайте же теперь выберем способ коммита, который будет использован в нашем inventory-service. Я предлагаю остановиться на ручном коммите и режиме RECORD. Несмотря на небольшую потерю в скорости, это обеспечивает максимальную гарантию того, что оффсет записан сразу после коммита БД. Для этого пропишем следующую строчку в application.properties файле:
spring.kafka.listener.ack-mode=record
Что по остальным сервисам?
У нас остались notification-service и analytics-service.
Я предлагаю в notification-service использовать ту же настройку, что и в inventory-service, а в analytics-service вообще ничего не менять. Да-да, мы можем спокойно использовать автокоммит в сервисе аналитики. Всё из-за того, что если мы не обработаем некоторые сообщения, то ничего критичного не случится. Общая картина не поменяется.
Подведение итогов
Если вы дочитали до этого момента (а не просто промотали), то теперь вы точно знаете опасность использования автокоммита, режимы ручного коммита, способы обеспечить надёжный и безопасный коммит оффсета при работе с базой данных.
По традиции, код, который получился в ходе статьи, разместил на GitHub.
В следующей статье мы посмотрим на транзакции в Kafka
sethan8r
Приветствую автора! Очень нравятся ваши статьи, большое вам спасибо за это! Хотелось-бы увидеть статью про avro и ее настройку путем имплементации от Serializer<SpecificRecordBase>! С наступающим!