Привет, Хабр!


Я работаю в команде Tinkoff, которая занимается разработкой собственного центра нотификаций. По большей части я разрабатываю на Java с использованием Spring boot и решаю разные технические проблемы, возникающие в проекте.


Большинство наших микросервисов асинхронно взаимодействуют друг с другом через брокер сообщений. Ранее в качестве брокера мы использовали IBM MQ, который перестал справляться с нагрузкой, но при этом обладал высокими гарантиями доставки.


В качестве замены нам предложили Apache Kafka, которая обладает высоким потенциалом масштабирования, но, к сожалению, требует практически индивидуального подхода к конфигурированию для разных сценариев. Кроме того, механизм at least once delivery, работающий в Kafka по умолчанию, не позволял поддерживать необходимый уровень консистентности из коробки. Далее я поделюсь нашим опытом конфигурации Kafka, в частности расскажу, как настроить и жить с exactly once delivery.


Гарантированная доставка и не только


Параметры, о которых пойдет речь далее, помогут предотвратить ряд проблем с настройками подключения по умолчанию. Но сначала хочется уделить внимание одному параметру, который облегчит возможный дебаг.


В этом поможет client.id для Producer и Consumer. На первый взгляд, в качестве значения можно использовать имя приложения, и в большинстве случаев это будет работать. Хотя ситуация, когда в приложении используется несколько Consumer’ов и вы задаете им одинаковый client.id, приводит к следующему предупреждению:


org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Если вы хотите использовать JMX в приложении с Kafka, то это может быть проблемой. Для этого случая лучше всего использовать в качестве значения client.id комбинацию из имени приложения и, например, имени топика. Результат нашей конфигурации можно посмотреть в выводе команды kafka-consumer-groups из утилит от Confluent:



Теперь разберем сценарий гарантированной доставки сообщения. У Kafka Producer есть параметр acks, который позволяет настраивать, после скольких acknowledge лидеру кластера необходимо считать сообщение успешно записанным. Этот параметр может принимать следующие значения:


  • 0 — acknowledge не будут считаться.
  • 1 — параметр по умолчанию, необходим acknowledge только от 1 реплики.
  • ?1 — необходимы acknowledge от всех синхронизированных реплик (настройка кластера min.insync.replicas).

Из перечисленных значений видно, что acks равный ?1 дает наиболее сильные гарантии, что сообщение не потеряется.


Как мы все знаем, распределенные системы ненадежны. Чтобы защититься от временных неисправностей, Kafka Producer предоставляет параметр retries, который позволяет задавать количество попыток повторной отправки в течение delivery.timeout.ms. Так как параметр retries имеет значение по умолчанию Integer.MAX_VALUE (2147483647), количество повторных отправок сообщения можно регулировать, меняя только delivery.timeout.ms.


Движемся к exactly once delivery


Перечисленные настройки позволяют нашему Producer’у доставлять сообщения с высокой гарантией. Давайте теперь поговорим о том, как гарантировать запись только одной копии сообщения в Kafka-топик? В самом простом случае для этого на Producer нужно установить параметр enable.idempotence в значение true. Идемпотентность гарантирует запись только одного сообщения в конкретную партицию одного топика. Предварительным условием для включения идемпотентности являются значения acks = all, retry > 0, max.in.flight.requests.per.connection ? 5. Если эти параметры не заданы разработчиком, то автоматически будут выставлены указанные выше значения.


Когда идемпотентность настроена, необходимо добиться того, чтобы одинаковые сообщения попадали каждый раз в одни и те же партиции. Это можно сделать, настраивая ключ и параметр partitioner.class на Producer. Давайте начнем с ключа. Для каждой отправки он должен быть одинаковым. Этого легко добиться, используя какой-либо бизнес-идентификатор из оригинального сообщения. Параметр partitioner.class имеет значение по умолчанию — DefaultPartitioner. При этой стратегии партиционирования по умолчанию действуем так:


  • Если партиция явно указана при отправке сообщения, то используем ее.
  • Если партиция не указана, но указан ключ — выбираем партицию по хэшу от ключа.
  • Если партиция и ключ не указаны — выбираем партиции по очереди (round-robin).

Кроме того, использование ключа и идемпотентной отправки с параметром max.in.flight.requests.per.connection = 1 дает вам упорядоченную обработку сообщений на Consumer. Отдельно стоит помнить, что, если на вашем кластере настроено управление доступом, то вам понадобятся права на идемпотентную запись в топик.


Если вдруг вам не хватает возможностей идемпотентной отправки по ключу или логика на стороне Producer требует сохранения консистентности данных между разными партициями, то на помощь придут транзакции. Кроме того, с помощью цепной транзакции можно условно синхронизировать запись в Kafka, например, с записью в БД. Для включения транзакционной отправки на Producer необходимо, чтобы он обладал идемпотентностью, и дополнительно задать transactional.id. Если на вашем Kafka-кластере настроено управление доступом, то для транзакционной записи, как и для идемпотентной, понадобятся права на запись, которые могут быть предоставлены по маске с использованием значения, хранящегося в transactional.id.


Формально в качестве идентификатора транзакции можно использовать любую строку, например имя приложения. Но если вы запускаете несколько инстансов одного приложения с одинаковым transactional.id, то первый запущенный инстанс будет остановлен с ошибкой, так как Kafka будет считать его зомби-процессом.


org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Чтобы решить эту проблему, мы добавляем к имени приложения суффикс в виде имени хоста, который получаем из переменных окружения.


Producer настроен, но транзакции на Kafka управляют только областью видимости сообщения. Вне зависимости от статуса транзакции, сообщение сразу попадает в топик, но обладает дополнительными системными атрибутами.


Чтобы такие сообщения не считывались Consumer’ом раньше времени, ему необходимо установить параметр isolation.level в значение read_committed. Такой Consumer сможет читать нетранзакционные сообщения как и раньше, а транзакционные — только после коммита.
Если вы установили все перечисленные ранее настройки, то вы настроили exactly once delivery. Поздравляю!


Но есть еще один нюанс. Transactional.id, который мы настраивали выше, на самом деле является префиксом транзакции. На менеджере транзакций к нему дописывается порядковый номер. Полученный идентификатор выдается на transactional.id.expiration.ms, который конфигурируется на Kafka кластере и обладает значением по умолчанию «7 дней». Если за это время приложение не получало никаких сообщений, то при попытке следующей транзакционной отправки вы получите InvalidPidMappingException. После этого координатор транзакций выдаст новый порядковый номер для следующей транзакции. При этом сообщение может быть потеряно, если InvalidPidMappingException не будет правильно обработан.


Вместо итогов


Как можно заметить, недостаточно просто отправлять сообщения в Kafka. Нужно выбирать комбинацию параметров и быть готовым к внесению быстрых изменений. В этой статье я постарался в деталях показать настройку exactly once delivery и описал несколько проблем конфигураций client.id и transactional.id, с которыми мы столкнулись. Ниже в краткой форме приведены настройки Producer и Consumer.


Producer:


  1. acks = all
  2. retries > 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ? 5 (1 — для упорядоченной отправки)
  5. transactional.id = ${application-name}-${hostname}

Consumer:


  1. isolation.level = read_committed

Чтобы минимизировать ошибки в будущих приложениях, мы сделали свою обертку над spring-конфигурацией, где уже заданы значения для некоторых из перечисленных параметров.


А вот пара материалов для самостоятельного изучения:


Комментарии (13)


  1. novoselov
    24.12.2019 17:29
    +2

    прочитал «Как Kafka стала бОлью»


    1. iFebrity
      25.12.2019 09:28

      +1


  1. Neveil
    24.12.2019 18:09

    Былью, в смысле от Кафки в итоге отказались?


    1. p_a_sh
      24.12.2019 18:34
      +2

      Былью — в смысле кафка (несмотря на всю изначальную боль) в итоге воплотилась в жизнь.


    1. Burnout171 Автор
      24.12.2019 18:41

      Нет, мы продолжаем работать с Кафкой. В заголовке хотелось обыграть схожесть слов бЫль и бОль, так как наш путь к гарантированной доставке был непростым.


  1. Godebug
    24.12.2019 19:54

    Так как параметр retries имеет значение по умолчанию Integer.MAX_VALUE (2147483647), количество повторных отправок сообщения можно регулировать, меняя только delivery.timeout.ms.

    Кто-то мешает задать другое значение?


    1. Burnout171 Автор
      25.12.2019 09:49

      Если Вы имеете ввиду другое значение для параметра retries, то да, его можно задать. Но, как и отмечено в статье, можно регулировать количество повторных отправок значением параметра delivery.timeout.ms. В некоторых случаях это будет удобнее за счет того, что Producer сделает максимальное число повторных отправок за указанный интервал времени и Вам не нужно будет подбирать значение для retries самостоятельно.


  1. alatushkin
    24.12.2019 21:39

    Кмк по этой теме нелишним будет упоминание паттерна Transaction Outbox и самостоятельного управления оффсетами на стороне Consumer


  1. hell0w0rd
    25.12.2019 02:15

    Но ведь это exactly once запись в кафку, как вы гарантируете exactly once чтение? В статье в целом мало информации о консьюмере, почему в нем нельзя было обеспечить exactly once по тому же ключу?
    Также в случае acks = -1 кластер становится фактически недоступен при выхода из строя одной из нод, как вы с этим боретесь?


    1. Burnout171 Автор
      25.12.2019 11:37
      +1

      Спасибо за интересные вопросы! Попробую ответить на них по порядку.

      Но ведь это exactly once запись в кафку, как вы гарантируете exactly once чтение? В статье в целом мало информации о консьюмере, почему в нем нельзя было обеспечить exactly once по тому же ключу?

      Мы пытались выжать максимум из средств, которые предоставляет сама Кафка, поэтому на Consumer в первую очередь настраиваем isolation.level = read_committed. Этого достаточно для внутренней коммуникации внутри нашей системы.
      Если я правильно понял часть вопроса с ключем, то такое поведение можно получить используя какой-либо внешний источник для хранения данных, с которым будет работать сервис, читающий из Кафки. Этот сервис будет считывать сообщение из топика, проверять наличие полученного ключа во внешнем источнике данных и обрабатывать сообщение, если полученный ключ уникальный. Хотя, это будет больше похоже на идемпотентную обработку данных. В нашей системе мы используем похожий механизм на граничных сервисах, так как не можем управлять Producer-ами сторонних систем.

      Также в случае acks = -1 кластер становится фактически недоступен при выхода из строя одной из нод, как вы с этим боретесь?

      Наша команда не занимается управлением кластера Кафки, но я попробую ответить на Ваш вопрос. Сочетание количества брокеров, фактора репликации и значения параметра min.insync.replicas нашего кластера позволяют нам пережить до 3х падений узлов. Вы можете подробнее разобраться в том, как кластер переживает падение узлов прочитав отличную статью от коллег. Кроме того, у нас настроен мониторинг статусов партиций и ответственные за кластер люди смогут во время вмешаться.


      1. vvberunenko
        25.12.2019 17:34
        +1

        Выжать максимум — это не совсем достичь "exactly once") При ребалансировке consumer group незакоммиченные оффсеты будут вычитаны другим назначенным потребителем группы. Это можно, в принципе, порешать кастомным ассайнором, но оно же убъет отказоустойчивость. Вся бОль распределенных систем в том, что не существует гарантированной одноразовой доставки и жесткой целостности данных. Kafka в данном случае не нарушает законов.


  1. andr1983
    26.12.2019 10:53
    +1

    Если честно, то от статьи ожидал большего. По сути, это пересказ документации по настройке exactly-once. Хотелось бы услышать больше, как Kafka справляется с ролью mq, для которой она подходит всётаки ограниченно. Какие были проблемы и решения. Может быть пришлость менять архитектуру или подходы.


    Насчёт exactly-once в Kafka мои личные наблюдения, что сильно полагаться на него не стоит. Приложения с поддержкой at-least-once семантикой в итоге имеют меньше проблем и можно добиться куда более высокой производительности. Exactly-once имеет смысл при процессинге kafka — > kafka да и то далеко не всегда.


    1. vvberunenko
      26.12.2019 16:59

      Зависит от того паттерна, с которым используют MQ. Заменять очередь на персистентный лог стоит разве что при наличии более, чем одного потребителя из очереди сообщений. Иначе это превращается в лютый кэш транспортного уровня с большими накладными расходами (инфраструктура и управление) на его организацию.