Представьте сайт интернет-магазина, на котором пользователи совершают покупки. Микросервисы, обрабатывающие запросы пользователей, отправляют в kafka сообщения об очередной сделанной покупке. В kafka один топик с одной партицией, из которой читает сообщения микросервис. Этот микросервис формирует историю покупок по каждому пользователю. Однажды одновременных покупок было совершено так много, что история у многих пользователей не обновлялась несколько часов.

Почему так произошло и как этого избежать? Давайте разберемся.

Модель системы

Смоделируем эту ситуацию с помощью трех маленьких программ на go и kafka в docker'е.

Схема модели системы
Схема модели системы

Что же изображено на схеме?

Название элемента схемы

Пояснения

gen

Утилита на go, которая моделирует запросы пользователей и генерирует "много" сообщений. В нашем случае всего 9. Этого достаточно для демонстрации.

P0

Единственная партиция kafka. P - от слова partition. Ноль присутствует в обозначении потому, что далее в схему добавится еще одна партиция и их понадобится различать.

A:1, A:2 и другие

Сообщения, которые сгенерировала утилита gen. Все сообщения на схеме показаны уже в партиции P0 в порядке записи. Порядок важен, иначе история покупок будет непоследовательной. Каждое сообщение обозначается содержимым ключа (key), двоеточием и содержимым значения (value). В значениях записаны цифры, которые позволяют отслеживать порядок сообщений.

service

Сервер на go, который читает и обрабатывает сообщения. Это модель микросервиса, формирующего историю покупок.

H0

Обработчик сообщений. H - от слова handler. Ноль в обозначении по той же причине, что и ноль в P0 - далее добавим еще обработчик. Обработчик - это горутина, которая записывает сообщения в базу данных.

A.db, B.db и C.db

Сервер на go, который моделирует некоторую СУБД. Не важно какую именно. Можно представить, что A.db, B.db и C.db - это шарды postgresql. Буквы в обозначениях не случайны - обработчик H0 все сообщения с ключом A отправляет в A.db, сообщения с ключом B - в B.db, сообщения с ключом С - в С.db.

На схеме также изображены consumer (читает сообщения из kafka) и producer (записывает сообщения в kafka). Это понятия kafka, реализация которых варьируется в зависимости от языка программирования и библиотеки-клиента kafka. Я использую библиотеку segmentio/kafka-go, в которой этим понятиям соответствуют структуры kafka.Writer и kafka.Reader.

Сообщения стоят в очереди

На несущественных подробностях реализации gen и service здесь и далее в статье останавливаться не будем - с их исходным кодом можно будет ознакомиться отдельно. Однако необходимо пояснить, что на схеме делают A.db, B.db и C.db и как они реализованы.

Реализация модели СУБД проста и выделяется двумя особенностями:

  • в один момент времени модель обрабатывает одно сообщение (sync.Mutex);

  • обработка одного сообщения занимает 1 секунду (time.Sleep).

var dbLock = sync.Mutex{}

func writeNumber(c *fiber.Ctx) error {
  dbLock.Lock()
  defer dbLock.Unlock()

  time.Sleep(1 * time.Second)

  slog.Info("written",
    "number", c.Params("number"),
  )

  return nil
}

Реализована модель в виде обработчика http-запроса с использованием веб-фреймворка fiber. Сообщения в постоянное хранилище не помещаются, а вместо этого выводятся в терминал.

Если запустить все элементы схемы, то по логам станет понято, что все девять сообщений, действительно обрабатываются по одной секунде каждое.

Логи модели СУБД: сообщения в одной очереди
Логи модели СУБД: сообщения в одной очереди

Еще одно важное наблюдение: сообщения обрабатываются последовательно - по очереди. Обработчик сообщений H0 отправляет сообщение, ждет 1 секунду ответа от СУБД и только после этого берется за следующее сообщение.

             9 секунд
-----------------------------------
C:3 C:2 C:1 B:3 B:2 B:1 A:3 A:2 A:1    

Так где же находится эта очередь? Очередь - это партиция P0, из которой сообщения доставляются в обработчик сообщений H0, чтобы по одному быть отправленными в соответствующие СУБД: A.db, B.db и C.db.

Одна общая очередь сообщений
Одна общая очередь сообщений

Одна большая очередь - это та самая причина, по которой история покупок в интернет-магазине обновлялась несколько часов. На практике это означает следующее: если в очереди находится 10 000 сообщений, то последнее сообщение в очереди будет ждать, пока обрабатываются 9 999 сообщений перед ним. И тут можно возмутиться: любой http-сервер на go обрабатывает каждый запрос в отдельной горутине, а у нас тут всего одна горутина-обработчик H0 и огромная очередь сообщений. Неужели работа с kafka сопряжена с такими ограничениями? Нет! Просто http-сервера инкапсулируют реализацию конкурентного кода, а с kafka конкурентный код приходится писать самостоятельно. Далее я покажу, как это можно сделать. Но прежде давайте посчитаем.

Потоки сообщений

СУБД A.db обрабатывает одно сообщение в секунду и B.db обрабатывает одно сообщение в секунду. Если A.db и B.db начинают обрабатывать по одному сообщению одновременно, то за одну секунду они вдвоем обработают 2 сообщения. А если к этому процессу подключить еще и C.db, то в секунду будут обрабатываться по 3 сообщения. Тогда все 9 сообщений обработаются за 3 секунды.

3 секунды
-----------
A:3 A:2 A:1
B:3 B:2 B:1
C:3 C:2 C:1

В СУБД A.db нужно последовательно записать сообщения A:1, A:2, A:3. Заметим, что у этих трех сообщений есть общая часть - ключ A, - и наложено требование на порядок обработки. Для удобства дальнейших рассуждений я введу понятие поток сообщений. Поток сообщений - это множество сообщений с общим идентификатором и установленным порядком обработки.

A:3 A:2 A:1 - это поток сообщений с идентификатором A
B:3 B:2 B:1 - это поток сообщений с идентификатором B
C:3 C:2 C:1 - это поток сообщений с идентификатором С

В очереди-партиции P0 находятся три потока сообщений, а в качестве идентификатора потока используется ключ сообщения. Давайте для начала логически выделим эти потоки.

Потоки сообщений
Потоки сообщений

Выходит, что распараллеливание обработки сообщений следует вести на уровне потоков сообщений. Действительно, чтобы ускорить обработку имеющихся потоков до 3 сообщений в секунду, потребуется сообщения из каждого потока поместить в отдельную очередь с собственным обработчиком.

В интернет-магазине поток сообщений - это множество сообщений о покупках одного пользователя. Идентификатором этого потока будет идентификатор пользователя. Поскольку порядок покупок в истории важен для каждого пользователя по отдельности, сообщения из каждого потока можно обрабатывать тоже по отдельности.

Создаем очереди с помощью партиций kafka

Первый механизм создания очередей, который мы рассмотрим, реализуется в основном средствами kafka, а именно за счет увеличения количества партиций. Как было установлено ранее, для достижения максимальной скорости обработки сообщений нужно три параллельные очереди. Однако в реальной жизни не всегда возможно и целесообразно создавать по одной партиции на поток сообщений. Поэтому добавим в нашу схему только одну дополнительную партицию.

Добавляем партицию и consumer: оба consumer'а в одном приложении
Добавляем партицию и consumer: оба consumer'а в одном приложении

Партиций теперь две. Чтобы дальнейшие рассуждения о партициях и consumer'ах имели смысл давайте условимся, что в рассматриваемой модели системы все consumer'ы состоят в одной consumer group.

Запускаем два consumer'а, чтобы у каждой очереди-партиции был свой обработчик сообщений. Если запустить consumer'ов меньше, чем партиций, то обязательно найдется такой consumer, который читает сообщения более чем из одной партиции. Тогда партиции, из которых читает один consumer, уже не являются отдельными очередями. Это будет одна общая очередь, потому что consumer будет обрабатывать сообщения из нескольких партиций по очереди - друг за другом. О параллельности тут и речи не будет. Это важное наблюдение, которое позволяет сделать вывод о том, что очередь параллельной обработки - это два компонента:

  • сама очередь сообщений (в данном случае это партиция);

  • обработчик сообщений (в данном случае обработчик и consumer - это одно целое).

На рисунке выше я изобразил лишь один из вариантов увеличения количества consumer'ов - в одном приложении запускаются несколько consumer'ов. Есть еще вариант - просто запустить два экземпляра приложения.

Добавляем партицию и consumer: каждый consumer в отдельном приложении
Добавляем партицию и consumer: каждый consumer в отдельном приложении

А еще можно использовать гибридное решение - запустить несколько экземпляров приложения, в каждом из которых запустить несколько consumer'ов. Серверу kafka не важно, каким образом запущены consumer'ы и на каком языке программирования они реализованы.

Разбиение топика на партиции дает возможность запустить экземпляры service на разных физических машинах. Если consumer'ы запущены на разных физических машинах (соответственно исполняются на разных процессорах), то можно говорить о том, что сообщения из каждой очереди обрабатываются именно параллельно, а не конкурентно (на одном процессоре). Эта особенность может быть полезна, если обработка сообщений включает ресурсоемкие вычисления, исполняемые на том же процессоре, на котором запущен consumer.

Можно выделить еще одно преимущество партиций. Представьте, что партиция одна, и при обработке одного из сообщений происходит ошибка. Ваше приложение делает повторные попытки обработать это сообщение. Пока эти попытки терпят неудачу, другие сообщения в партиции ждут совей очереди. А если бы партиций было больше одной, то ошибка обработки сообщения из одной партиции не затронула бы обработку сообщений из других партиций.

Для определенности далее будем рассматривать вариант с двумя экземплярами service.

Распределяем сообщения по очередям

На текущий момент мы определились с партициями и consumer'ами. Запустим утилиту gen.

Дополнительная партиция привела к нарушению порядка обработки сообщений в потоках
Дополнительная партиция привела к нарушению порядка обработки сообщений в потоках

Тут же обнаруживается проблема: нарушен порядок сообщений в потоках. Когда партиция была одна, producer в утилите gen не задумывался, в какую партицию поместить то или иное сообщение. Теперь партиций две, и producer'у нужен алгоритм выбора партиции. Библиотека segmentio/kafka-go предоставляет такой алгоритм. Это известный алгоритм round robin, принцип работы которого следующий. Первое сообщение, которое нужно записать в kafka, - это A:1. Оно записывается в партицию P0. Далее сообщение A:2 записывается в P1. Партиций больше нет, значит A:3 записывается снова в P0. И так далее по кругу.

Чтобы более наглядно продемонстрировать нарушение порядка сообщений я специально немного "сдвинул назад во времени" сообщения в партиции P1 на схеме выше. В реальности такой сдвиг может произойти, например, из-за сетевых задержек. Тогда в соответствии со схемой сообщения поступят в обработчики H0 в следующем порядке.

A:1 A:3 B:2 C:1 A:2 C:3 B:1 B:3 C:2

Итак, количество партиций увеличено, однако порядок обработки сообщений нарушен. Причина в том, что алгоритм распределения сообщений по партициям, используемый по умолчанию, не учитывает наши потоки сообщений - потоки A, B и С. Это нормально, ведь разработчики segmentio/kafka-go ничего не знают об этих потоках.

Меняем round robin на другой алгоритм, который учитывает наши потоки.

Распределяем сообщения по партициям правильно с помощью подходящего алгоритма
Распределяем сообщения по партициям правильно с помощью подходящего алгоритма

Здесь через f(S)=P я обозначил алгоритм, который каждому потоку сообщений (S, stream) ставит в соответствие номер партиции (P, partition). Библиотека segmentio/kafka-go позволяет реализовать интерфейс Balancer, чтобы направить потоки сообщений в соответствующие партиции. Ниже приведена реализация этого интерфейса, то есть приведен алгоритм f(S)=P.

type SPBalancer struct{}

func (b *SPBalancer) Balance(msg kafka.Message, partitions ...int) (partition int) {
  hash := func(s string) int {
    h := fnv.New32a()
    h.Write([]byte(s))
    return int(h.Sum32())
  }

  return partitions[hash(string(msg.Key))%len(partitions)]
}

spBalancer.Balance(kafka.Message{Key: []byte("A")}, 0, 1) // вернет 0
spBalancer.Balance(kafka.Message{Key: []byte("B")}, 0, 1) // вернет 1
spBalancer.Balance(kafka.Message{Key: []byte("C")}, 0, 1) // вернет 0

Алгоритм использует идентификатор потока, в нашем случае хранящийся в ключе сообщения, чтобы определить для этого сообщения номер партиции. Довольно часто алгоритм f(S)=P базируется на вычислении хеш-суммы идентификатора потока и последующем взятии остатка от целочисленного деления на количество партиций. В данном примере всего две партиции и распределить по ним сообщения не составляет труда, однако в реальных системах разработка алгоритма f(S)=P для как можно более равномерного распределения сообщений по партициям вкупе с выбором идентификатора потока является нетривиальной задачей.

Промежуточный итог. Имеем две полноценные очереди сообщений, базирующиеся на партициях.

Логи модели СУБД: сообщения в двух параллельных очередях
Логи модели СУБД: сообщения в двух параллельных очередях

Однако потоки A и C все еще ожидают обработки в одной очереди.

        6 секунд
-----------------------
C:3 C:2 C:1 A:3 A:2 A:1
            B:3 B:2 B:1

Значит, пришло время рассмотреть следующий механизм создания очередей.

Создаем очереди с помощью каналов go

Параллельный consumer - это механизм создания очередей, для реализации которого в golang используются каналы (channel). Собственно, каналы и будут играть роль очередей. Чтобы очередь была полноценной, каждый канал нужно обеспечить своим обработчиком сообщений. Напомню, что обработчик сообщений - это горутина. Взглянем поближе на то, что из себя представляет параллельный consumer, отправляющий сообщения в СУБД A.db и C.db.

Принцип работы параллельного consumer'а
Принцип работы параллельного consumer'а

Consumer извлекает пакет сообщений (batch) из партиции и распределяет сообщения по очередям-каналам. Здесь f(S)=H отображает потоки сообщений в номера обработчиков (H, handler) все с той же целью сохранения порядка сообщений. Таким образом в каждом канале оказываются сообщения только из одного потока.

В итоге система примет следующий вид.

В модели системы появился параллельный consumer
В модели системы появился параллельный consumer

Мы привели систему к такой конфигурации, в которой каждый поток сообщений ждет обработки в собственной очереди.

Логи модели СУБД: сообщения в трех параллельных очередях
Логи модели СУБД: сообщения в трех параллельных очередях

Это максимальное количество параллельных очередей, которые можно создать для обработки сообщений с учетом порядка. Но всегда ли максимальное количество очередей уместно?

Останавливаемся вовремя

Важно понимать, какие потоки сообщений есть в системе и оценить их количество. Например, если существует только один поток, то распараллеливать просто нечего - одной очереди достаточно.

Еще один повод ограничить количество очередей - это высокая нагрузка на систему к которой обращаются обработчики сообщений. В нашем случае обработчики обращаются к системе из трех СУБД A.db, B.db и C.db. Когда СУБД три, то все сообщения обрабатываются за 3 секунды, потому что у нас три потока, три очереди и три СУБД. Но давайте представим, что СУБД всего одна, и она обрабатывает сообщения из всех потоков.

Один экземпляр модели СУБД не может обрабатывать сообщения из трех потоков одновременно
Один экземпляр модели СУБД не может обрабатывать сообщения из трех потоков одновременно

Вспомните, что модель СУБД, которую мы используем, обрабатывает только одно сообщение в один момент времени благодаря sync.Mutex. То есть конкурентные запросы просто встанут в очередь уже не в партициях kafkа или каналах service, а в единственной СУБД. В итоге 9 сообщений снова будут обрабатываться по очереди в течение 9-ти секунд. Я хочу сказать, что очень просто запустить параллельный consumer с 10 000 000 обработчиков сообщений, однако надо понимать, справляется ли с таким количеством конкурентных запросов ваша СУБД.

Заключение

В заключении небольшой план по решению проблемы с длительным обновлением истории покупок в интернет-магазине из начала статьи.

Начать можно с внедрения параллельного consumer'а. Этот шаг не потребует увеличения количества партиций, ведь в некоторых случаях увеличить количество партиций просто нет возможности. А если такая возможность есть, то можно добавить партиции и запустить несколько экземпляров микросервиса, который читает сообщения kafka. Каждый поток сообщений в такой системе будет соответствовать последовательности покупок определенного пользователя, и в качестве идентификатора этого потока стоит выбрать идентификатор пользователя. Количество очередей удобно регулировать с помощью параллельного consumer - изменить количество каналов go проще, чем добавить или удалить партицию. Будет удобно, если количество каналов будет задаваться в конфигурации микросервиса.

Список источников

Описание основных концепций kafka можно найти по следующим ссылкам:

По этой ссылке располагается исходный код java-библиотеки Confluent Parallel Consumer (термин параллельный consumer я позаимствовал отсюда), которая предназначается для создания очередей сообщений в kafka consumer. В документации описаны техники параллельной обработки сообщений kafka и приводится множество сценариев использования этого подхода.

Ссылки на библиотеки go, упомянутые в статье:

Исходный код приложений, рассмотренных в статье, находится здесь.

Комментарии (15)


  1. akurilov
    26.10.2023 07:08

    Зачем здесь кафка, если есть нативный для go nats?


    1. ewolf
      26.10.2023 07:08

      Nats из коробки как минимум не персистентный (не берём в рассмотрение Jetstream)


      1. akurilov
        26.10.2023 07:08

        Почему не берём Jetstream?


        1. ewolf
          26.10.2023 07:08

          Ну кстати это хороший вопрос, я не имею отношения к статье и ее кейсу, но со своей стороны я бы сказал, что джетстрим по моему опыту показал себя гораздо медленнее и менее надёжным, чем кафка


          1. akurilov
            26.10.2023 07:08

            А где можно посмотреть на такие числа?


            1. ptr128
              26.10.2023 07:08

              На 5-ти мегабайтных пакетах для push и pull c lz4 (но не snoopy, как по умолчанию), да еще в protobuf, Kafka существенно опередит Jetstream. Особенно если много автономных подписчиков-сервисов на один топик. На одиночных сообщениях с одним подписчиком - скорее всего проиграет.


    1. sim6a Автор
      26.10.2023 07:08

      Мне хотелось рассмотреть решения связанные именно с kafka. Ведь kafka довольно часто применяют для решения подобных задач.


    1. ptr128
      26.10.2023 07:08

      Как раз последнее и накладывает ограничения на его применение. Ведь в жизни почти всегда сталкиваемся с гетерогенной средой, как бы нам не хотелось этого избежать. Кроме того наличие Debezium и Confluent - большой плюс. А управление метаданными в protobuf/avro/json через schema registry из коробки - огромный плюс.


  1. stanlyzoolo
    26.10.2023 07:08
    +2

    А статус лидирующей партиции и "остальных" как то повлияют на балансировку?


    1. sim6a Автор
      26.10.2023 07:08

      Такие эксперименты не продовил. Попробую порассуждать.

      Насколько мне известно, producer всегда записывает сообщения в Leader-партицию.

      Пусть producer посылает сообщение в партицию 0. Запись в партицию 0 в данный момент недоступна. Пусть запись недоступна по следующей причине. У consumer установлен параметр acks=all, сообщение записалось в Leader-партицию, и не записалось в необходимое количество Follower-партиций (запись подтвердили меньше Follower-партиций, чем установлено в параметре брокера min.insync.replicas). Тогда сообщение, которое producer пытался записать в партицию 0 не будет записана ни в какую другую партицию - producer получит ошибку.

      Получается, что при записи сообщение не попадет в неправильную партицию

      Теперь посмотрим на чтениие.

      По умолчанию consumer читает сообщения из Leader-партиции. Пусть чтение из Leader-партиции невозможно. Такое могло произойти из-за сетевых задержек или перегрузки брокера. Тогда из Follower-партиций назначается новая Leader-партиция, и из нее продолжит читать сообщения consumer. Если новая Leader-партиция будучи Folower-партицией не успела получить все сообщения от прежней Leader-партиции, то consumer не получит часть сообщений. Насколько я знаю, эти сообщения kafka не восстановит.

      Тогда отвечу на ваш вопрос так: не повлияют.

      Но, повторюсь, собственноручно я это не проверял.


  1. ptr128
    26.10.2023 07:08
    +1

    А почему нельзя сделать три партиции по количеству БД и не повесить на них три синка, каждый на свою БД? По умолчанию они будут вливать в БД по 500 собщений суммарным размером не больше мегабайта, но эти параметры можно настраивать. А уже обработка сообщений, если таковая требуется, в том же PostgreSQL в триггерах все равно построчная. Зато обработка 500 сообщений в одной транзакции будет производиться на порядок быстрее, чем обработка этих же сообщений в 500 транзакциях.


    1. sim6a Автор
      26.10.2023 07:08
      +1

      Согласен с вами, есть более оптимальные решения.

      Хотелось показать, как можно действовать в условиях, когда есть возможность использовать только ограниченное количество партиций. Пример, конечно, сильно упрощенный. Параллельный consumer имеет смысл использовать, когда обработка сообщения заключается в обращении к нескольким внешним системам и применении проверок предметной области, а не только в записи в базу.


    1. sim6a Автор
      26.10.2023 07:08
      +1

      Надо было больше внимания в статье уделить ограничениям, тогда, возможно, решения выглядели бы более оправданными. Учту это в будущем.

      Спасибо вам за комментарий.


  1. yurgers
    26.10.2023 07:08

    А, насколько целесообразно использовать каналы вместе с Kafka?

    Если мы наберём много сообщений из Kafka, в калалы. И наше приложение упадет, то все они потеряются...

    Если будем сильно ограничивать, канал, у нас может получится так, что один из каналов будет простаивать.


    1. sim6a Автор
      26.10.2023 07:08

      Целесообразность подхода

      На вопрос, когда такой подход целесообразно применить, гораздо лучше меня ответит список сценариев использования библиотеки Parallel Consumer: https://github.com/confluentinc/parallel-consumer/blob/master/README.adoc (пункт 3.4. Scenarios).
      В основном, насколько я понимаю, авторы Parallel Consumer предлагают использовать этот подход, когда увеличивать количество партиций не представляется возможными, либо когда увеличение партиций значительно не ускоренит обработку сообщений.

      Потеря сообщений после падения приложения

      Потери сообщений можно избежать, если фиксировать пакет прочитанных сообщений явно вызовом метода Commit в коде обработчика сообщений только после того, как сообщения обработаны. Если приложение упадет в процессе обработки пачки сообщений, то после перезапуска приложения все незафиксированные сообщения считаются consumer'ом повторно.

      Тут еще надо сказать, что segmentio/kafka неявно для клиента библиотеки считывает из kafka сообщения в буферный канал msgs, и именно из msgs извлекается по одному сообщения вызовом метода Fetch. Думаю, что большинство реализаций consumer поступают аналогично - используют буфер считанных сообщений в оперативной памяти приложения.

      Менее активное использование одного канала по сравнению с остальными (если я правильно вас понял).

      Думаю, что это проблема не подхода с каналами, а больше проблема балансировки нагрузки и распределения сообщения по очередям. Ведь может оказаться, что и одна из партиций kafka будет пуста, потому что алгоритм распределения сообщений не гарантирует равномерного распределения.

      Я согласен, что в любом подходе есть плюсы и минусы. Думаю, что нужно находить компромиссные решения с учетом исходных данных и ограничений решаемой задачи.