Нам говорят, что Apache Kafka сохраняет порядок сообщений для каждой темы/раздела, но насколько это правда? В этой статье мы проанализируем несколько реальных сценариев, в которых слепое принятие этой догмы, может привести к неожиданным и ошибочным последовательностям сообщений.

Базовый сценарий: один производитель

Начнём с базового сценария: один производитель отправляет сообщения в топик Apache Kafka с одним разделом последовательно, одно за другим.

В этой базовой ситуации, согласно известной мантре, мы должны ожидать всегда правильного порядка. Но так ли это? Смотря как!

Нестабильная сеть

В идеальном мире сценарий с одним производителем всегда должен приводить к правильному порядку. Но наш мир не идеален! Различные сетевые пути, ошибки и задержки могут привести к задержке или потере сообщения.

Давайте представим ситуацию ниже: один продюсер отправляет в топик три сообщения:

  • Сообщение 1 по какой-то причине находит длинный сетевой маршрут к Apache Kafka

  • Сообщение 2 находит самый быстрый сетевой маршрут к Apache Kafka

  • Сообщение 3 теряется в сети

Даже в этом базовом сценарии только с одним производителем мы могли бы получить неожиданную серию сообщений в теме. Конечный результат в теме Kafka покажет только два сохраняемых события с неожиданным порядком 2, 1.

С точки зрения Apache Kafka это может быть правильный порядок. Тема — это всего лишь журнал информации, и Apache Kafka будет записывать сообщения в журнал в зависимости от того, когда он «чувствует» приход нового события. Он основан на времени приема Kafka, а не на времени создания сообщения (время события).

Подтверждения и повторы

Но не все потеряно! Если мы посмотрим в библиотеки (например, aiokafka), то заметим, что у нас есть способы обеспечить правильную доставку сообщений.

Прежде всего, чтобы избежать проблемы с сообщением 3 в приведенном выше сценарии, мы могли бы определить правильный механизм подтверждения. Параметр acks у производителя позволяет нам определить, какое подтверждение получения сообщения мы хотим получить от Apache Kafka.

Установка этого параметра в 1 гарантирует, что мы получим подтверждение от основного брокера, ответственного за тему (и раздел). Установка этого параметра all гарантирует, что мы получим подтверждение только в том случае, если и первичный, и реплики правильно сохранят сообщение. Это избавит нас от проблем, когда только первичный получает сообщение и падает, прежде чем распространить его на реплики.

После того как мы установили ack, мы должны установить возможность повторной отправки сообщения, если мы не получим надлежащее подтверждение. В отличие от других библиотек (одной из них является kafka-python), aiokafka автоматически повторяет попытку отправки сообщения до тех пор, пока не будет превышено время ожидания (установленное параметром request_timeout_ms).

С подтверждением и автоматическими повторными попытками мы должны решить проблему с сообщением 3. При первой отправке производитель не получит ack, поэтому по истечении интервала retry_backoff_ms он отправит сообщение 3 снова.

Максимальное количество flight_request

Однако, если вы внимательно посмотрите на конечный результат в теме Apache Kafka, полученный порядок неверен: мы отправили 1,2,3 и получили 2,1,3в теме... как это исправить?

Старый метод (доступный в kafka-python) заключался в том, чтобы установить максимальное количество flight_request на одно соединение: это то количество сообщений, которому мы позволяем висеть «в воздухе» одновременно без подтверждения. Чем больше сообщений мы запускаем одновременно, тем больше риск получить сообщения не по порядку.

При использовании kafka-python, если нам абсолютно необходимо было иметь определенный порядок в теме, мы были вынуждены ограничить max_in_flight_requests_per_connection до 1. Предположим, что мы установили минимальный ack параметр как 1 и ожидали подтверждения каждого отдельного сообщения (или пакета сообщений, если размер сообщения меньше размера пакета) перед отправкой следующего.

Абсолютная правильность запроса, подтверждения и повторных попыток достигается ценой пропускной способности. Чем меньшему количеству сообщений мы позволяем находиться «в воздухе» одновременно, тем больше подтверждений нам нужно получить, тем меньше общих сообщений мы можем доставить в Kafka за определенный период времени.

Идемпотентные производители

Чтобы преодолеть строгую сериализацию отправки одного сообщения за раз и ожидания подтверждения, мы можем определить идемпотентных производителей. При использовании идемпотентного производителя каждое сообщение маркируется идентификатором производителя и серийным номером (последовательность, сохраняемая для каждого раздела). Этот составленный ID затем отправляется брокеру вместе с сообщением.

Брокер отслеживает серийный номер для каждого производителя и темы/раздела. Когда приходит новое сообщение, брокер проверяет составленный ID, и если в рамках одного производителя значение равно предыдущему номеру + 1, то новое сообщение подтверждается, в противном случае оно отклоняется. Это обеспечивает гарантию глобального упорядочивания сообщений, позволяя увеличить количество запросов в полете на одно соединение (максимум 5 для Java клиента).

Усложняем: с несколькими производителями

До сих пор мы представляли базовый сценарий только с одним производителем, но реальность Apache Kafka такова, что часто производителей несколько. О каких мелочах нужно знать, если мы хотим быть уверены в конечном результате?

Разные локации, разная задержка

Опять же, сеть неодинакова, и у нескольких производителей, расположенных в очень удаленных местах, может быть разная задержка. Это означает, что порядок может отличаться от порядка, основанного на времени события.

К сожалению, задержки между разными точками на Земле исправить не можем, поэтому нам придется принять этот сценарий.

Пакетирование, дополнительная переменная

Чтобы добиться более высокой пропускной способности, мы можем группировать сообщения. При пакетной обработке мы отправляем сообщения «группами», сводя к минимуму общее количество вызовов и увеличивая соотношение полезной нагрузки к общему размеру сообщения. Но при этом мы снова можем изменить порядок событий. Сообщения в Apache Kafka будут храниться для каждого пакета в зависимости от времени приема пакета. Таким образом, порядок сообщений будет правильным для каждого пакета, но в разных пакетах могут быть разные упорядоченные сообщения.

Теперь, когда есть разные задержки и пакетная обработка, кажется, что наше глобальное предположение об упорядочевании провалится... Итак, почему мы утверждаем, что можем управлять событиями по порядку?

Спаситель: время события

Мы убедились, что первоначальное предположение о том, что Kafka сохраняет порядок сообщений, не на 100% верно. Порядок сообщений зависит от времени приема Kafka, а не от времени генерации события. Но что, если нам очень важен порядок, основанный на времени события?

Ну, мы не можем решить проблему на стороне производства, но мы можем сделать это на стороне потребителя. Все наиболее распространенные инструменты, работающие с Apache Kafka, имеют возможность определять, какое поле использовать в качестве времени события, включая Kafka Streams, Kafka Connect с выделенным преобразованием одиночных сообщений (SMT) для извлечения меток времени и Apache Flink.

Потребители, если они правильно определены, смогут перетасовать порядок сообщений, поступающих из определенной темы Apache Kafka. Давайте проанализируем пример Apache Flink ниже:

CREATE TABLE CPU_IN (
    hostname STRING,
    cpu STRING,
    usage DOUBLE,
    occurred_at BIGINT,
    time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
    )
WITH (
   'connector' = 'kafka',
   'properties.bootstrap.servers' = '',
   'topic' = 'cpu_load_stats_real',
   'value.format' = 'json',
   'scan.startup.mode' = 'earliest-offset'
)

В приведенном выше определении таблицы Apache Flink мы можем заметить:

  • occurred_at: поле определено в исходной теме Apache Kafka во времени unix (тип данных — BIGINT).

  • time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3): преобразует время unix в временную метку Flink.

  • WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND определяет новое time_ltz поле (вычисленное из occurred_at) как время события и определяет порог позднего прибытия событий с максимальной задержкой 10 секунд.

После того как указанная выше таблица определена, time_ltz поле можно использовать для правильного упорядочивания событий и определения окон агрегации, гарантируя, что все события в пределах допустимой задержки включены в расчеты.

INTERVAL '10' SECOND определяет задержку конвейера данных и является штрафом, который нам необходимо включить, чтобы обеспечить правильный прием поздно прибывающих событий. Обратите внимание, однако, что пропускная способность не влияет. У нас может быть столько сообщений в нашем конвейере, сколько мы хотим, но мы «ждем 10 секунд» перед вычислением любого окончательного KPI, чтобы убедиться, что мы включили в картину все события в определенный период времени.

Альтернативный подход, который работает только в том случае, если события содержат полное состояние, состоит в том, чтобы сохранить для определенного ключа (hostname и cpu в приведенном выше примере) максимальное время события, достигнутое на данный момент, и принимать изменения только в том случае, если новое время события больше, чем установленный максимум.

Подведем итоги

Концепция упорядочивания в Kafka может быть сложной, даже если мы включаем только одну тему с одним разделом. В этом посте рассказывается о нескольких распространенных ситуациях, которые могут привести к неожиданному порядку событий. К счастью, такие опции, как ограничение количества передаваемых сообщений или использование идемпотентных производителей, могут помочь добиться упорядочения, соответствующего ожиданиям. В случае нескольких производителей и непредсказуемости сетевой задержки нам остается только возможность исправить общий порядок на стороне потребителя путем правильной обработки времени события, которое необходимо указать в полезной нагрузке.


Если вы хотите глубже разобраться в Apache Kafka, приходите на курс «Apache Kafka для разработчиков».

Вы узнаете типовые шаблоны проектирования, сделаете свое приложение надежнее, получите опыт разработки нескольких приложений, использующих Kafka.

Старт потока — 14 июля.

Комментарии (14)


  1. DmitryKoterov
    23.06.2023 18:34
    +4

    Порядок сообщений зависит от времени приема Kafka, а не от времени генерации события.

    Хотелось бы заметить, что это и в реальном мире (физическом) так. Ибо теория относительности: события, одновременные в одной системе отсчета, всегда неодновременны в другой неравной ей (в том числе вплоть до смены их порядка на противоположный, и разница тем больше, чем больше разница в скорости этих СО друг относительно друга). Т.е. если есть СО, в которой “A затем B”, то найдется такая СО, в которой это будет “B затем A”, и это не «кажется», это реально так и есть: в мире нет универсальной одновременности/порядка событий, только если между ними успеет пинг проскочить и синхронизировать порядок при помощи информации.


    1. nin-jin
      23.06.2023 18:34

      Если звук грома приходит позже, чем свет от молнии, значит ли это, что звук рождается уже после того, как молния погасла, и это не «кажется», это реально так и есть?


  1. diakin
    23.06.2023 18:34
    +1

    "идемпотентные производители" - это про КДПВ? Ж8-()


  1. BugM
    23.06.2023 18:34
    +1

    В любой распределенной системе порядок основанной на времени разных серверов это очень условная штука. Не надо на такое закладываться. Часы ведут себя иногда очень странно. Время может идти в обратную сторону.

    Порядок гарантируется номером одной последовательности или чем-то таким. Кафка вам его и гарантирует. Все работает правильно.


  1. Kano
    23.06.2023 18:34
    +3

    Я чегото может не понимаю, но почему одиночный продюсер из примера отправляет все сообщения паралельно? Возможно автор поста опустил партиции на своих рисунках? Но тогда это какойто ввод взаблуждение.


  1. white-wild
    23.06.2023 18:34
    +1

    Разве порядок сообщений не должен гарантироваться транспортным уровнем, протоколом tcp/ip?


    1. Hardcoin
      23.06.2023 18:34
      -2

      Нет конечно. Разве вы видели в tcp/ip такие гарантии?


      1. ermouth
        23.06.2023 18:34
        +2

        Ну, формально tcp гарантирует очередность пакетов в рамках одного соединения. Это, правда, к содержанию статьи мало относится.


        1. Hardcoin
          23.06.2023 18:34
          +1

          Он гарантирует, что получатель может собрать пакеты в правильном порядке (они пронумерованы), но прийти к получателю они могут вразнобой.

          Возвращаясь к теме статьи, если все сообщения пронумеровать, то Кафка в теории тоже смогла бы их собрать в порядке отправителя.


          1. creker
            23.06.2023 18:34

            С точки зрения клиента гарантия все равно сохраняется - как байты были посланы, так они и придут. Поэтому на первой картинке с разными маршрутами пакетов нарисована полная глупость. Она работает только если до одного брокера открывается несколько соединений, что помоему клиенты кафки не делают


            1. khajiit
              23.06.2023 18:34

              как байты были посланы, так они и придут

              Нет. Придут ≠ соберутся. Протокол гарантирует именно сборку, а не доставку. Собственно, необходимость маркировки и сборки возникает из-за доставки вразнобой.


              Она работает

              … всегда, потому что сериализация происходит не позже чем на уровне phy: среда у вас одна и она или занята или нет. Никакой одновременной передачи, если речь не идет об агрегации линков, быть у вас не может, и даже с агрегацией должны хеши правильно лечь.


              1. creker
                23.06.2023 18:34

                И ради чего все эти подробности? Замечательно, что мы тут с вами прекрасно разбираемся, как все работает внутри, только это не имеет ни малейшего отношения к сабжу.

                Статья про кафку, которая работает на уровне абстракции TCP сокета. Приложению абсолютно плевать что и как там на физическом уровне, хоть там голубиной почтой пакеты идут через несколько континентов. На уровне приложения все так, как я написал - как байты были посланы, так они и придут. Все, на этом можно поставить точку. И именно поэтому первый кейс в статье лютый бред, который можно натянуть только на кейс с несколькими TCP сокетами. Опять же, клиенты по-моему так не делают. Они открывают один сокет до каждого брокера.


    1. creker
      23.06.2023 18:34
      +3

      Должно и гарантируется. Если рассматривать кейс с одним соединением, на первой картинке нарисован полный бред. Неважно совершенно какими маршрутами полетят пакеты. Стэк их в любом случае соберет в том порядке, в котором они были посланы.


  1. Djaler
    23.06.2023 18:34
    +5

    Сначала пару раз проскочило слово "топик", а потом всё время используется перевод "тема". И "раздел" туда же. Мне кажется, не стоит переводить эти слова, это вполне себе устоявшиеся и всеми понимаемые термины - топик и партиция.