Как известно, во многих IT-проектах есть типичная задача -  транзакционная обработка данных в интеграционных сценариях, когда необходимо согласованно отправить или принять данные из внешней системы и при этом обновить собственное состояние приложения.

Особенно интересной эта задача становится, когда для интеграции используется Kafka, так как она имеет свои ограничения, касающиеся реализации транзакционности. Вообще, сейчас Kafka достаточно широко применяется именно  в качестве платформы для асинхронной интеграции, это справедливо и для проектов, которые мы в ЛАНИТ — Би Пи Эм реализуем, например, в Альфа-Банке и ВТБ. Поэтому, надеемся, данная тема будет интересна многим.

В этой статье рассмотрим подход к реализации распределённых транзакций (в рамках одного Java-приложения), которые охватывают Kafka и реляционную СУБД. Для этого воспользуемся средствами управления транзакциями, имеющимися в Spring.

Варианты с организацией eventual consistency с помощью типовых паттернов (Saga, Transactional Outbox и др.) и/или использования дополнительных платформ (Debezium, Kafka Connect и пр.) - тема для отдельной статьи (так что ждите продолжения). В этой статье тему затрагивать не будем. 

Транзакционные гарантии в Kafka

Пятиминутка теории. Kafka поддерживает распределённые транзакции с полноценными ACID-гарантиями только в пределах самой себя, т.е. транзакция может охватывать несколько producer'ов и consumer'ов для разных топиков. Однако Kafka не реализует XA-протокол, поэтому не поддерживает распределённые транзакции с участием ресурсов других типов (например, СУБД или JMS). 

Поэтому любые (в том числе описываемые в статье!) прикладные решения по организации распределённых транзакций с участием Kafka и сторонних ресурсов будут иметь следующие ограничения:

• полноценные ACID-гарантии обеспечить не получится;

• приложение должно быть устойчиво к возникновению неконсистентного состояния и должно самостоятельно принимать необходимые меры к выходу из него;

• потребуется использование сторонних (по отношению к самой Kafka) инструментов.

С технической точки зрения, распределённая транзакция с участием Kafka будет представлять собой набор отдельных локальных транзакций (в каждом из участвующих ресурсов - Kafka, СУБД), и эти транзакции будут координироваться внешними средствами.

Способ координации, с которым мы будем работать (потому что его реализуют используемые нами средства Spring) называется 1 phase commit best effort, при котором каждый участвующий в общей распределённой транзакции менеджер делает коммит своей локальной транзакции независимо от других менеджеров (отсюда - 1 phase...), при этом все менеджеры делают свои коммиты строго по очереди друг за другом, и в эту очередь они должны быть выстроены по возрастанию «надежности», т.е. по убыванию вероятности ошибки при коммите (отсюда - ...best effort). Таким образом, ошибка при коммите общей транзакции будет выброшена максимально рано (в идеале - при коммите у первого менеджера), и остальные менеджеры не сделают свои коммиты.

Очевидно, 1 phase commit best effort - это вероятностный, а не гарантированный метод. Следовательно, в результате система может остаться в неконсистентном состоянии, когда в цепочке менеджеров транзакций первый менеджер («менее надежный») успешно сделал коммит, а у второго («более надежного») возникла ошибка при коммите. В результате потребуются дополнительные меры по выводу системы из неконсистентного состояния (компенсация транзакций либо повторение всей цепочки транзакций с наложением требований идемпотентности на все операции).

Чтобы описанная здесь схема 1 phase commit best effort была устойчива к ошибкам в прикладном коде, он целиком должен выполняться в контексте самой первой локальной транзакции в очереди. В этом случае любые исключения, выбрасываемые в прикладном коде, будут откатывать первую локальную транзакцию и, соответственно, всю распределенную транзакцию тоже. 

Описание функционального сценария

Итак, нашей задачей является организация распределенных транзакций, охватывающих Kafka и реляционную СУБД в рамках одного приложения (например, микросервиса). При этом для полноты картины рассмотрим как задачу транзакционной отправки, так и задачу транзакционного получения сообщений.

Чтобы получить более-менее компактный модельный Java-проект, совместим обе эти задачи в рамках одного общего сквозного сценария, который реализует гарантированную доставку сущностей из одной БД в другую посредством транспорта Kafka. Также для простоты запустим данные через Kafka в один поток: по одному экземпляру producer'а и consumer'а, один topic с одной партицией.

Последовательность шагов сценария будет следующей.

  • Изначально сущности создаются в виде записей в таблице-источнике.

  • На стороне источника запускается обработчик, который в цикле выполняет следующий набор действий (каждая такая итерация заключена в одну транзакцию):

    • выбирает из БД-источника очередную необработанную сущность,

    • помечает её как обработанную и сохраняет в БД-источнике,

    • отправляет её в виде сообщения в Kafka.

  • На стороне потребителя каждое сообщение из Kafka также обрабатывается следующим образом (обработка происходит в рамках отдельной транзакции):

    • сохраняет сущность из полученного сообщения в БД-приёмнике.

Ожидаемый результат:

  • все сущности, изначально созданные в БД-источнике, в результате оказываются в БД-приёмнике,

    • при этом нет повторов и пропусков сущностей;

  • порядок вставки сущностей в БД-приёмнике соответствует порядку их отправки источником,

  • указанные выше результаты достигаются всегда, независимо от исключений, выбрасываемых в прикладной логике источника и приёмника.

Схема интеграционного сценария
Схема интеграционного сценария

Как указывалось ранее, любой из возможных вариантов несёт риск неконсистентного состояния, поэтому отдельно данный минус не указывается.

Spring for Apache Kafka

В целом, Spring for Apache Kafka имеет следующие механизмы для поддержки транзакций при работе с Kafka (https://docs.spring.io/spring-kafka/reference/html/#transactions):

  • KafkaTransactionManager,

  • KafkaMessageListenerContainer,

  • Локальные транзакции при использовании KafkaTemplate,

  • Синхронизация транзакций с другими менеджерами транзакций.

Для включения транзакционной работы с сообщениями необходимо задать свойства Kafka:

  • spring.kafka.producer.transaction-id-prefix = <строка>

  • spring.kafka.consumer.properties.isolation.level = read_committed

Для включения Kafka producer'а в транзакцию Spring нужно использовать бин KafkaTransactionManager и подать в его конструктор бин ProducerFactory как параметр. Для отправки сообщений нужно использовать бин KafkaTemplate, который настроен на эту же самую ProducerFactory, что и KafkaTransactionManager. Детали описаны в документации: https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager

Далее рассмотрим способы организации транзакционной работы с использованием указанных выше механизмов.

Первый вариант

Использовать бин ChainedTransactionManager или более специализированный вариант - ChainedKafkaTransactionManager - детали см. https://docs.spring.io/spring-kafka/reference/html/#container-transaction-manager

ChainedTransactionManager - это готовая реализация принципа 1 phase commit best effort.

В модельном проекте ChainedTransactionManager создаётся со следующим порядком transaction manager'ов: ChainedTransactionManager(myKafkaTransactionManager, jpaTransactionManager),

- т.е. сначала создаётся транзакция Kafka, далее - транзакция БД. Коммит происходит в обратном порядке - сначала транзакция БД, потом транзакция Kafka.

Не рекомендуем этот вариант - из-за использования deprecated-компонентов ChainedTransactionManager и ChainedKafkaTransactionManager.

Второй вариант

Вместо deprecated ChainedKafkaTransactionManager можно связывать транзакции Kafka и БД в прикладном коде, для чего потребуется реализация следующих мер:

В конфигурации задать:

Транзакции в приложении должны создаваться в следующем порядке.

  1. Сначала инициируется транзакция Kafka (для этого бин KafkaTransactionManager и сделан менеджером по умолчанию). 

    • При отправке сообщения она создаётся в головном компоненте, помеченном @Transactional.

    • При получении сообщения она создаётся автоматически контейнером.

  1. После этого инициируется транзакция БД.

    • При отправке сообщения она создаётся во вложенном компоненте (вызываемом из головного, см. выше), помеченном как @Transactional(transactionManager = "jpaTransactionManager").

    • При получении сообщения она создаётся в listener'е, помеченном аннотациями @KafkaListener + @Transactional("jpaTransactionManager").

В результате, точно как и в варианте с ChainedTransactionManager, сначала создаётся транзакция Kafka, далее - транзакция БД. Коммит происходит в обратном порядке - сначала транзакция БД, потом транзакция Kafka. Таким образом, этот вариант полностью функционально идентичен предыдущему варианту с ChainedKafkaTransactionManager, он также является реализацией метода 1 phase commit best effort.

Если ошибка происходит на этапе коммита транзакции Kafka, то закомитченная перед этим транзакция БД не откатывается. В сценариях получения сообщения это приведёт к тому, что сообщение будет доставлено повторно, поэтому операции работы с БД должны быть идемпотентными (https://docs.spring.io/spring-kafka/reference/html/#ex-jdbc-sync).

Рекомендуем этот вариант.

TransactionSynchronization

Описанные выше варианты являются неявной реализацией распределённых транзакций - основную логику координации берёт на себя фреймворк.

В качестве альтернативы можно выбрать вариант явной реализации логики транзакции - в прикладном коде. Для этого можно использовать компоненты TransactionSynchronizationManager и TransactionSynchronization, с помощью которых можно достаточно гибко управлять разными аспектами выполнения транзакции, в т.ч. порядком исполнения отдельных шагов и обработкой ошибок (сюда же можно включить логику выхода из неконсистентного состояния).

Однако сложность этого варианта может быть избыточной для многих прикладных сценариев.

Получается, что это нишевый вариант, потенциально более сложный по сравнению с вышеописанными.

Модельный проект

Общее описание

Указанный в начале документа функциональный сценарий реализуется в модельных проектах.

Оба проекта базируются на следующих компонентах:

  • Spring for Apache Kafka;

  • Test containers для Apache Kafka и PostgreSQL. Соответственно, для локального запуска проектов требуется наличие локального Docker (например, Docker Desktop).

Ожидаемым результатом выполнения каждого из проектов будет следующее.

  1. В логах присутствуют сообщения об exception'ах: Sender fault 1, Sender fault 2, Sender fault 3, Receiver fault 1, Receiver fault 2.

  2. После выброса всех exception'ов в логах присутствует информация о том, что все сущности получены, причем в том же порядке, в котором они были отправлены: 

List of received entities: [Text-1, Text-2, Text-3]

No entities left to send

Моделирование ошибок

В модельных проектах есть ряд настроек, позволяющих включать/выключать генерацию ошибок в разных участках кода:

myservice:
  send-transactions-faults-num: 0     # сколько имитировать сбоев при отправке сообщений
  receive-transactions-faults-num: 0  # сколько имитировать сбоев при получении сообщений
  business-faults: true               # имитировать ли сбои в бизнес-логике

send-transactions-faults-num

Генерация системных (инфраструктурных) исключений. Если задать его значение = N (где N > 0), то будут генерироваться исключения в kafka transaction manager'е в транзакциях отправки сообщений. Это приведет к тому, что при отправке сообщений транзакция БД (комиттится первой в цепочке 1PC best effort) будет закомитчена, и соответственно сущность в БД-источнике получит статус «обработана», а транзакция Kafka (комиттится второй в цепочке 1PC best effort) откатится, и соответственно сообщение не уйдет consumer'у. В результате нарушится консистентность: будет потеряно N первых сообщений, например для N = 1:

List of received entities: [Text-2, Text-3]

No entities left to send

receive-transactions-faults-num

Генерация системных (инфраструктурных) исключений. Если задать его значение = N (где N > 0), то будут генерироваться исключения в kafka transaction manager'е в транзакциях получения сообщений. Это приведет к тому, что при отправке сообщений транзакция БД (комиттится первой в цепочке 1PC best effort) будет закомитчена, и соответственно сущность в БД-приемнике сохранится, а транзакция Kafka (комиттится второй в цепочке 1PC best effort) откатится, и соответственно сообщение будет повторно доставлено consumer'у. В результате нарушится консистентность: первое сообщение будет повторено N раз, например для N = 2:

List of received entities: [Text-1, Text-1, Text-1, Text-2, Text-3]

No entities left to send

business-faults

Генерация исключений в прикладном коде источника и приемника сообщений. Эти исключения не влияют на консистентность обработки потока сообщений.

Для выхода из неконсистентного состояния в приложении требуется реализовать отдельную логику. В модельном проекте она не реализована.

Заключение

В целом, можно сказать, что Spring for Apache Kafka предоставляет разработчику привычные (“@Transactional - и всё работает !”) и при этом достаточно неплохие средства поддержки транзакций в интеграционных сценариях. Однако полноценных ACID-гарантий достичь не получится, поэтому могут потребоваться самостоятельные усилия по возврату приложения к консистентному состоянию в случае ошибок.

Комментарии (15)


  1. makar_crypt
    29.03.2022 11:53

    как это без транзакций в вашем случае? код же такого вида у вас получается

    let m = kafka.GetMessage()

    DoIncrementFieldInDatabase(m);

    // <- электричество выключается тут

    kafka.Commit

    Тоесть вы допустим энкрементите , а перед комитов падает сервер. В след раз опять поступает это сообщение и базе мы уже получает не валидные данные .


    1. deermakov Автор
      29.03.2022 12:00
      +4

      Если я правильно понимаю, то вы описываете такую ситуацию:

      1. Начата транзакция Kafka

      2. Начата транзакция БД

      3. Вычитали сообщение из кафки

      4. Обновили данные в БД

      5. Коммит транзакции БД

      6. <<выключилось электричество>>

      7. <<сообщение из кафки не вычитано (т.е. оффсет не закомитчен) >>

      8. <<после включения электричества то же самое сообщение попадет на повторную обработку>>

      Если так, то да, система останется в неконсистентном состоянии, т.к. данные в БД уже обновлены, а то же самое сообщение в кафке пойдет на повторную обработку. Именно про это я и пишу и именно эту ситуацию а моделирую через параметр receive-transactions-faults-num (задайте его > 0 и увидите этот эффект.)

      Забегая вперед, скажу, что лечить эту конкретную проблему можно например таким образом: запоминать в БД ключи уже обработанных сообщений и отбрасывать их при повторном получении - паттерн Idempotent Consumer (https://microservices.io/patterns/communication-style/idempotent-consumer.html). Но это усложнение решения и это точно out of scope данной статьи. Если все сложится хорошо - опишем это в отдельной статье.


      1. AlexSpaizNet
        29.03.2022 15:31

        Если я не ошибаюсь, решение этой проблемы это хранить оффсеты на стороне консюмера. Что то вроде Outbox Pattern только на входящие сообщения.

        1. Kafka Consumer starts

        2. Load the last offset from the db

        3. Receive message from kafka

        4. tr = Start db transaction

        5. handleMessage(tr)

        6. persistOffset(tr)

        7. tr.commit()

        Или все или ничего...


        1. deermakov Автор
          29.03.2022 15:42
          +2

          Хранить consumer offsets в потребителе (и управлять ими в своем прикладном коде) - это значит брать на себя реализацию существенного объема функционала, который уже реализован в Кафке. Амбициозная задача.


          1. AlexSpaizNet
            29.03.2022 20:29

            Ну иногда не особа есть выбор. Например когда очень жесткое требование на Exactly once delivery.


  1. Throwable
    29.03.2022 12:45

    Странный какой-то вы кейс выбрали, возможно специфичный только для BPM. В подавляющем большинстве нужно: прочитать из одного топика, обработать/записать в базу и отправить в другой топик.

    Хотелось бы также знать политику консьюмера при роллбеке транзакции: делается ли повтор, как контролировать delay и limit при повторе, отфутболится ли месседж в DLQ, остановить ли весь консьюмер и т.д. Тут масса нюансов и вопросов как это контролировать в спринге.

    Вообще, при планировании распределенной архитектуры лучше закладываться на гарантию "at least once" и вручную везде обеспечивать идемпотентность. Насколько я понял, кафка не обеспечивает "exactly once" между двумя топиками.


    1. deermakov Автор
      29.03.2022 13:07
      +3

      Почему же странный ? "Получить сообщение + обновить БД" - это элементарный (но при этом вполне реальный) сценарий, на котором проще всего исследовать проблему. И он конечно не специфичен для BPM.

      "Прочитать + обновить БД + отправить" - это просто чуть более сложный сценарий, и при этом можно усложнять и далее, комбинируя всё больше взаимодействий - но это вряд ли что-то добавит к сути статьи, скорее наоборот - замаскирует её за второстепенными деталями. То же самое на мой взгляд относится и к retries и dead letters - поддержка всего этого есть в Spring for Apache Kafka, но это выходит за рамки статьи, это вопросы, достойные отдельного рассмотрения. Тем более достойна отдельного рассмотрения тема с "остановкой consumer'а", т.к. тут возникнет куча вопросов с ребалансировкой, порядком обработки сообщений и т.д.

      В целом, я считаю, в небольшой статье лучше сфокусироваться на одном вопросе и рассмотреть какой-то простейший (но живой) случай, чтобы понять принцип, и уже дальше этот принцип (с соответствующими расширениями и дополнениями, конечно) использовать в более сложных сценариях.


      1. Norgorn
        30.03.2022 01:38

        "Прочитать + обновить БД + отправить"

        А какой вообще подход тут может быть? Вроде, видится, что последняя отправка в Кафку должна быть одним из шагов в фиксации транзакции (видимо, первым)? Или можно положиться на менеджер транзакций, который зафиксирует отправку вместе с фиксацией чтения (потому что чисто внутри себя Кафка умеет в сложные транзакции)?


        1. deermakov Автор
          30.03.2022 10:25

          Скорее всего - да, второй вариант: можно положиться на менеджер транзакций Кафки, т.к. внутри Кафки есть ACID-гарантии при работе с несколькими топиками.


        1. Throwable
          30.03.2022 10:25

          потому что чисто внутри себя Кафка умеет в сложные транзакции

          Вот фиг его знает. Пишут, что умеет, если выставлен read_committed уровень изоляции.

          The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction).

          Но на практике нигде нет внятной инфы как это физически реализовано (топики-то разные!) Поэтому закладываться на такие гарантии может быть чревато проблемами.


          1. deermakov Автор
            30.03.2022 10:27
            +1

            Хорошая тема для исследования и статьи на Хабре )

            Что касается деталей реализации, то есть большой дизайн-документ на эту тему - "Exactly Once Delivery and Transactional Messaging in Kafka", https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.xq0ee1vnpz4o


  1. Savochkin
    29.03.2022 20:42

    >> Очевидно, 1 phase commit best effort - это вероятностный, а не гарантированный метод. 

    не совсем понятно зачем нужен описанный в статье подход, если можно использовать старый добрый transactional outbox + идемпотентность и получить гарантированный результат.

    Удивительно, но я не нашёл в своё время готовой реализации to outbox в spring, но это не трудно реализовать самому.

    допускаю, что есть класс задач где best effort достаточен - но тогда было бы интересно обсудить что это за класс задач.


    1. deermakov Автор
      30.03.2022 10:17

      Хороший вопрос )

      Пользу описанного здесь подхода я вкратце вижу так: получить хорошую защиту от неконсистентности за небольшую плату.

      Плата состоит в том, что вы настраиваете transaction manager'ы и развешиваете @Transactional - это несложно. А в результате получаете, что ошибки в прикладной логике не ломают консистентность вашего приложения - а это основная доля ошибок/сбоев в приложениях. Остается непокрытым риск системных проблем - электричество выключилось, сеть отпала и т.д., но это маловероятные риски, тем более, что для поломки консистентности они должны произойти в очень маленький промежуток времени между коммитами транзакции БД и Кафки.

      Суммарно, допустим, вы получаете 99,99% гарантии консистентности при небольших трудозатратах - очень неплохо, особенно по сравнению с полным отсутствием контроля )

      Transactional outbox + idempotent consumer - это более существенные трудозатраты, хотя конечно не rocket science. Про eventual consistency планирую сделать отдельную статью.


      1. Savochkin
        30.03.2022 16:47

        Все равно не понятно. Ну потратьте пару дней / недельку вкрутите outbox и живите без таких компромиссов. Хотите мы вам дадим реализацию?

        Мне кажется вы статью дольше писали чем outbox бы вкрутили ????


        1. deermakov Автор
          30.03.2022 16:48

          Спасибо, у нас есть.