Всем привет! 

Меня зовут Владимир Олохтонов, я руковожу командой разработки в отделе Message Bus, который является частью платформы Ozon. Мы занимаемся разработкой самых разных систем вокруг Kafka, etcd и Vault. В этой статье я расскажу о том, как мы строили линейно масштабируемую gRPC-прокси перед Kafka, способную обслуживать миллионы запросов в секунду, используя Go.

Пруфы :)
Пруфы :)

У нас довольно крупный кластер. Он состоит из 75 брокеров, управляет 30 000 партиций, а рейт поднимается до 5 млн запросов в секунду. Так что задачка перед нами стояла нетривиальная.

Дисклеймер

Статья написана с расчётом на читателя, хотя бы поверхностно знакомого с Kafka и gRPC streaming. Если вы пока не можете про себя такое сказать, то вот несколько ссылочек для ознакомления: 

Самые распространённые ошибки при работе с Kafka

Что такое gRPC за 10 минут

Chat Server — gRPC Bi-directional Streaming

Зачем вообще нужна такая прокси?

В Ozon, как и в множестве других крупных компаний, Kafka используется в качестве корпоративной шины данных — механизма, позволяющего с минимальными трудозатратами строить асинхронные связи между системами по паттерну Pub/Sub.

Разберём стандартную историю развития шины данных на базе Kafka в большой компании.

Первые связи строятся на основе ванильных open-source-библиотек — и это не так просто, как кажется, поскольку в дизайне Kafka используется подход с «умными библиотеками», содержащими десятки параметров, значения которых надо грамотно подбирать. Более того, они поддерживают только часть функциональности Kafka и содержат немало багов.

С ростом числа связей логичным следующим шагом становится выбор из всего многообразия библиотек нескольких наиболее стабильных и написание поверх них простых адаптеров с вдумчиво подобранными параметрами и стандартизированными метриками.

Затем возникает потребность в дополнительной функциональности, специфичной для компании. В случае Ozon это автоматическое получение списка брокеров от service discovery — Warden — и OAuth-авторизация.

Адаптеры становятся всё сложнее и сложнее, их используют сотни сервисов, что означает длинный хвост старых версий и много работы по их регулярному обновлению. Более того, на горизонте маячат дополнительные фичи — и наши адаптеры уже не выглядят такими уж приятными для поддержки.

Как известно, нет таких проблем в программировании, которые нельзя решить повышением уровня абстракции ???? В голову приходит идея: что, если большую часть логики реализовать на стороне сервера, полностью изолировать от потребителей работу с Kafka, а клиентский протокол упростить до неприличия? Тогда мы могли бы снова сделать наши библиотеки простыми, обновлять их редко, а клиенты бы получали новые фичи практически без доработок с их стороны. Да и уметь переезжать с одного кластера на другой было бы очень кстати.

Разумеется, у такого подхода есть своя цена: появляется новая точка отказа, дополнительный хоп для трафика между сервисами и брокерами, а также уменьшается максимальная производительность каждого из клиентов в отдельности по сравнению с тщательно настроенной библиотекой.

Мы взвесили плюсы и минусы, после чего принялись за работу. Проект получил кодовое название data-bus.

Сбор требований

Вид связи

При работе с Kafka есть два основных сценария: publish и subscribe. И если для первого из них постоянное соединение нужно лишь для обеспечения высокой производительности, то в случае со вторым это требование протокола самой Kafka.

Значит, обычный RPC нам не подойдёт — нужно использовать долгоживущую двустороннюю связь.

Самый простой из доступных вариантов — это TCP. Однако он достаточно низкоуровневый. Поэтому удобнее опереться на протокол более высокого уровня: WebSocket или gRPC streaming.

В Ozon gRPC используется повсеместно, поэтому в качестве первого приближения мы выбрали второй вариант.

gRPC streaming предоставляет отличный уровень абстракции — ordered reliable channel. И на уровне приложения нам не нужно думать о деталях поддержания связи с клиентом.

Рейт и трафик

Из численных показателей начнём с рейта и трафика, поскольку их легко посчитать ????

Возьмём данные с самого большого из наших кластеров. Хоть сейчас и не сезон, но порядок оценить можно: fetch rate (читающие запросы) — 2 млн rps, produce (запись) — около 1,5 млн rps. 

Входящего трафика у нас порядка 2,5 гигабайт в секунду, то есть 20 гигабит, исходящего — традиционно для Pub/Sub заметно больше — около 7 гигабайт в секунду, или 56 гигабит.

Переходим к оценке числа клиентов. Они бывают двух видов: консьюмеры и продюсеры.

В случае с консьюмерами всё просто, поскольку существуют kafka-scripts. ​​kafka-consumer-groups --bootstrap-server ${broker} --all-groups --describe --members — и мы получили IP-адреса всех консьюмеров. После небольшой обработки вывода с помощью Python мы узнали, что у нас 16 000 консьюмеров, из которых 11 000 имеют хотя бы одну назначенную партицию.

С продюсерами ситуация посложнее — пришлось исходить из оценок по косвенным метрикам. Мы знали, что подключений к кластеру у нас около 400 000, из которых примерно четверть — это соединения от консьюмеров.

Когда продюсер подключается к кластеру, он устанавливает соединения со всеми лидерами партиций, в которые собирается писать, поэтому не каждое из подключений к кластеру отражается в отдельного продюсера.

Вот что у нас получилось:

Трафик

Входящий: 20 Гбит/с

Исходящий: 56 Гбит/с

Рейт запросов

Produce 1,5 млн rps

Fetch 2 млн rps

Конкурентность

50 000 продюсеров

16 000 консьюмеров

Строим систему методом постепенного возведения

Вообще сложно сразу придумать, как построить систему такого размера. Есть, конечно, какие-то очевидные вещи вроде того, что она должна деплоиться в k8s, скейлиться подами, писать метрики и т. д. Однако в какой-то момент всё равно придётся начать думать о деталях реализации, которых немало.

Построить мы должны что-то подобное:

Как известно, слона надо есть по частям. Я расскажу о том, как такого слоника грамотно приготовить.

Отладка транспорта

Мы начали с самого непонятного — с выяснения того, как будет себя вести gRPC streaming под нагрузкой и хватит ли нам его возможностей.

Для этого мы соорудили локальный стенд из двух сервисов на «голом» gRPC прямо по инструкции с официального сайта. data-bus реализовывал семантику echo-сервера, и это позволило нам понять, как себя ведёт один экземпляр приложения с gRPC-сервером на борту.

схема работы echo-сервера
схема работы echo-сервера

Получили такие результаты:

Количество клиентов

Количество переданных сообщений

Время

(секунд)

Количество сообщений в секунду

1

100 000

3,52

28 373

2

200 000

3,63

54 989

5

500 000

4,73

105 607

10

1 000 000

6,22

160 678

20

2 000 000

8,58

232 907

50

5 000 000

16,62

300 808

100

10 000 000

29,44

339 664

На этом этапе мы можем сделать следующий вывод: с одного инстанса удаётся снять не менее 300 000 rps gRPC-циклов запрос-ответ на четырёх ядрах. Это нас более чем устроило. Мы также проверили, скейлится ли gRPC по ядрам, запустив те же программы на dev-сервере и выкрутив GOMAXPROCS, — масштабирование практически линейное.

Кстати, рекомендую отличный пост с исследованием gRPC streaming performance.

Путь в Kubernetes

Все сервисы в Ozon должны жить в Kubernetes, и data-bus не исключение. Важно убедиться, что на этом этапе мы не получим серьёзной деградации производительности, чтобы потом было легче отделить проблемы с Kafka от проблем с инфраструктурой.

Для начала мы переделали нашу связку из echo-сервера с клиентом в полноценные сервисы data-bus и data-bus-checker на основе платформенного фреймворка и измерили производительность. На этом этапе из-за записи метрик мы получили деградацию около 17% по сравнению с «чистым» echo-сервером.

Причину деградации удобнее всего отслеживать с помощью go tool pprof в формате flame graph.

В дальнейшем data-bus-checker будет играть роль своеобразного внешнего скелета системы, позволяя проводить постоянное нагрузочное тестирование.

Часовая башня во Владивостоке как иллюстрация внешнего скелета
Часовая башня во Владивостоке как иллюстрация внешнего скелета

Настоящий API

Жалко, конечно, но мы не echo-сервер пишем, а слой абстракции перед Kafka. Нам придётся придумать новый интерфейс, а затем проверить, повлияет ли он как-то на производительность системы. Не должен, конечно, но мало ли ????

Семантически мы хотели обеспечить гарантии durable writes и at-least-once delivery, то есть чтобы каждое успешно записанное сообщение было обработано системой-потребителем не менее одного раза. Это заметно упрощает реализацию бизнес-логики, поскольку единственное, что нужно делать, — это корректно обрабатывать дубликаты сообщений, опираясь на ключи идемпотентности.

В качестве технологии описания протокола в Ozon используется Protocol Buffers. На ней остановились и мы. Настройки подключения один раз пробрасываются через grpc-заголовки при установлении соединения и в дальнейшем больше не передаются.

https://www.oreilly.com/library/view/grpc-up-and/9781492058328/ch04.html

Протокол

После генерации новых gRPC-стабов мы заменили нашу реализацию echo-сервера на тот же самый echo-сервер, но работающий через новый протокол, после чего снова прогнали нагрузочные тесты с помощью data-bus-checker. Таким образом мы проверили, не сломали ли мы чего по дороге. Всё было в порядке.

Прежде чем переходить к прикручиванию настоящей работы с Kafka, мы решили проверить, что будет, если увеличить число подключенных клиентов. 

Напомню, что наша цель — около 80 000 клиентов. Каково же было наше удивление, когда обнаружилось, что уже на 1000 подключений резко вырос latency операций!

К счастью, в этот момент система ещё была достаточно простой (банальный echo-сервер), поэтому мы быстро нашли проблему в работе с gRPC: выяснилось, что на стороне data-bus-checker для всех конкурентных подключений используется один инстанс gRPC-клиента, что, несмотря на создание отдельных gRPC-стримов, приводило к их упаковке в одно-единственное TCP-соединение, то есть требовало упорядочивания всех сообщений во всех стримах.

Как только мы решили эту проблему, начав создавать по одному клиенту на подключение, мы смогли увеличить количество соединений до 10 000 — и нам тут же разорвало data-bus по памяти. Здесь мы тоже искали причину недолго: работа с большим количеством подключений приводит к аллокации множества буферов, размер которых был выставлен неадекватно большим. Хороший ориентир — 32 Кб. Искать причины подобных проблем приятнее всего с помощью go tool pprof -heap.

После решения и этой проблемы всё наконец-то заработало как надо. С транспортом было покончено — и мы перешли непосредственно к работе с Kafka.

Прикручиваем работу с Kafka 

В качестве библиотеки мы выбрали franz-go — нам хотелось, чтобы data-bus из коробки предоставлял для консьюмеров механизм ребалансировки без stop the world cooperative-sticky, а при таком требовании выбор, мягко говоря, невелик. 

Начали мы с продюсера, поскольку он семантически устроен несколько проще, чем консьюмер.

Логика работы Producer

Клиент — приложение, работающее с Kafka через data-bus, например data-bus-checker.

Сервер — data-bus.

  1. Клиент устанавливает двунаправленное соединение.

  2. Сервер заводит Kafka producer, через который будет идти запись в целевой топик.

  3. Клиент отправляет сообщение PublishRequest{messages: []Message}.

  4. Сервер синхронно записывает полученные сообщения в Kafka и отвечает сообщением PublishResponse{ack: true}.

  5. Клиент, прочитав PublishResponse, понимает статус записи.

  6. goto 3.

Единственное, что нам пришлось подкрутить, — это параметр ProducerLinger, время накопления сообщений на стороне библиотеки для отправки в Kafka батчами. Такой подход помогает увеличить throughput при работе с библиотекой напрямую, но в нашем случае его стоило сделать поменьше, поскольку клиенты между подключениями у нас не разделялись.

Логика работы Consumer

Клиент — приложение, работающее с Kafka через data-bus, например data-bus-checker.

Сервер — data-bus.

  1. Клиент устанавливает двунаправленное соединение.

  2. Сервер заводит Kafka consumer с group_id=consumer_group_name.

  3. Сервер отправляет клиенту сообщение SubscribeResponse{messages: []Message}.

  4. Клиент, получив SubscribeResponse, обрабатывает сообщения и направляет серверу SubscribeRequest{ack: true}.

  5. Сервер отмечает сообщения как обработанные.

  6. goto 3.

Изначально мы хотели опереться на автокоммиты, чтобы надёжно фиксировать уже обработанные сообщения, не перегружая брокеры коммитами на каждой итерации. Но натолкнулись на новый подводный камень: при разрыве соединения от клиента под капотом библиотеки происходил коммит. Это может быть нормально при работе с библиотекой напрямую, но нам не подходило совершенно, ведь мы не должны ничего коммитить без команды от клиента.

сценарий потери сообщений при отключении клиента
сценарий потери сообщений при отключении клиента

В итоге мы перешли на механику AutoCommitMarks в сочетании с автокоммитами. Это позволило нам обеспечить подходящий уровень производительности при соблюдении семантических требований.

Пришло время натурных испытаний системы в сборе.

Натурные испытания

Producer

Начали мы с продюсера. На стороне data-bus-checker было имплементировано три сценария:

  1. latency-optimized — отправка по одному сообщению без задержек.

  2. throughput-optimized — отправка батча из 100 сообщений без задержек.

  3. ticker — отправка по одному сообщению с регулируемой задержкой между отправками.

С первыми двумя сценариями всё достаточно очевидно, а третий мы хотели использовать для проверки поведения системы при тысячах и десятках тысяч подключений, через каждое из которых отправляется по одному сообщению раз в секунду.

Испытание на 1000 подключений

Первое, с чем мы столкнулись, — это волнообразный рост 99 перцентиля latency с периодичностью в пять минут. Наши опытные админы тут же предположили, что проблема кроется в обновлении метаданных.

Дело в том, что каждый клиент Kafka время от времени обновляет метаданные о конфигурации кластера, и это довольно тяжёлые запросы. Если же мы запускаем тысячи клиентов одновременно, то их жизненные циклы становятся синхронизированными и они все запрашивают метаданные практически одновременно. Мы добавили случайную задержку перед началом передачи сообщений, но, к нашему удивлению, это не помогло.

Спустя два дня поисков проблемы мы обнаружили, что, хоть клиенты и спят перед отправкой первого сообщения, gRPC-соединение с data-bus устанавливается до сна, а именно в этот момент заводился Kafka-клиент, регулярно запрашивающий метаданные.

Поняли мы это после того, как добавили на стороне data-bus метрику для времени ожидания сообщений от клиентов и это время стало поразительно напоминать наше время сна :)

Когда мы это исправили, показатель latency перестал гулять и продюсеры, наконец, заработали нормально.

Испытание на 10 000 подключений

Для этого испытания мы выставили задержку между отправками сообщений в 30 секунд, чтобы не отвлекаться на скорость обработки самих сообщений.

На графиках время от времени стали появляться пики по несколько секунд. Не буду мучить вас историей о том, как мы это отлаживали, но в итоге выяснилось, что в franz-go по умолчанию довольно непродолжительное время жизни idle-подключений и, пока мы проходили по всем партициям, подключения к первым из них успевали «протухнуть» и их приходилось устанавливать заново. Проблема решилась подкручиванием параметра ConnIdleTimeout.

В этой точке мы смогли получить ровный график latency без пиков, но с 99 перцентилем в районе 300 мс — это слишком много, поскольку 99 перцентиль записи в Kafka у нас составляет примерно 35 мс.

Результаты профилирования показали, что около 75% времени сервер проводит в ожидании системных вызовов. Следовательно, нам нужно было уменьшить их число. Пришло время немного пересмотреть дизайн системы: если сначала мы не собирались разделять клиенты между подключениями, то теперь поняли, что это необходимо для более эффективной утилизации ресурсов сервера.

После прикручивания механизма агрегации клиентов у нас уменьшился показатель latency, а волосы стали гладкими и шелковистыми батчинг и сжатие стали работать заметно лучше. Однако это привело к уменьшению максимального throughput и увеличению latency для latency-optimized-продюсеров из-за необходимости ждать, пока обработаются сообщения из конкурентных подключений. 

В итоге мы реализовали агрегацию опцией with_clients_aggregation, которой могут управлять клиенты, чтобы можно было выбрать наиболее эффективную стратегию для каждого из сценариев.

Consumer

Первым делом на стороне data-bus-checker был имплементирован механизм, позволяющий управлять числом и размером групп, а также их конфигурацией: размером батчей и задержкой между запросами.

Сначала проверялось максимальное количество циклов get-ack, затем — максимальная пропускная способность. Мы заметили интересный эффект: тогда как два пода data-bus из трёх выдавали порядка 10 Гбит/с трафика, третий выдавал почти в два раза больше. Это было связано с тем, что он физически находился на той же машине, где соответствующий ему под data-bus-checker, и, соответственно, между ними были околонулевые сетевые задержки. Мы запомнили это на случай, если нам понадобится ещё сильнее оптимизировать систему. 

Дальше по аналогии с продюсером мы перешли к проверке максимального количества подключений — и… data-bus снова стал умирать от OOM Killer.

К тому моменту мы были уже стреляными воробьями. Расчехлили наш go tool pprof -heap — и увидели, что основной объём памяти выделяется в кишках franz-go. Это оказались буферы, куда записываются сообщения из партиций, которые затем будут передаваться на обработку.

Размер буферов регулируется параметром FetchMaxBytes. По умолчанию установлено довольно большое значение в 50 Мб — можете себе представить, сколько нужно памяти для обработки тысяч клиентов с десятками партиций. 

После выбора более подходящего значения (у нас это 32 Кб) система начала стабильно работать при 10 000 подключений на под.

Полноразмерный тест

В начале статьи я упоминал о том, что мы хотим уметь обрабатывать в секунду порядка 3,5 млн запросов, разделённых между продюсерами и консьюмерами. Надо бы проверить, будет ли система корректно работать под такой нагрузкой.

К этому моменту нам было известно, что один под data-bus может обрабатывать порядка 100 000 rps, поэтому мы отмасштабировали data-bus до 48 подов (с запасом производительности в полтора раза).

Затем мы подобрали такую конфигурацию data-bus-checker, которая более-менее реалистично описывала наш боевой профиль, и запустили нагрузку.

Первое, во что мы упёрлись, — это в невозможность записывать более 1 млн rps в один топик с 21 партицией. Эту проблему мы решили, просто создав десять топиков и немного подкрутив data-bus-checker, чтобы он умел в них писать.

по 1 партиции на брокер, без агрегации
по 1 партиции на брокер, без агрегации

После реализации записи в десять топиков мы смогли получить требуемую производительность. Важно отметить, что на этом этапе мы не использовали агрегацию клиентов, поскольку это необязательная оптимизация.

по 10 партиций на брокер, без агрегации
по 10 партиций на брокер, без агрегации

Мы также проверили, как влияет на систему включение агрегации для небольших групп клиентов. И нам удалось записать почти 5 млн rps!

запись с агрегацией продюсеров
запись с агрегацией продюсеров

Что ещё интереснее, так это то, насколько сильно уменьшилась нагрузка на саму Kafka после этого.

Заключение

Безусловно, до того как система будет готова к внедрению, нам предстоит проделать ещё немало работы. Однако костяк, способный выдерживать нагрузку в миллионы rps, мы возвели — и потратили на это всего около двух человеко-месяцев.

Главный принцип: большие системы надо строить по кусочкам, непрерывно отслеживая важные свойства. Тогда, если после добавления очередного блока будет замечено падение производительности, сразу будет понятно, куда копать.

Помните: не боги горшки обжигают. Пробуйте — и у вас получится, как получилось у нас. Желаю удачи!

Комментарии (41)


  1. ewolf
    20.07.2023 13:38
    +2

    Весьма интересная статья, спасибо!


  1. Crimsonland
    20.07.2023 13:38
    +5

    Разработчик счастлив, он не должен думать про kafka.

    Админ счастлив, разработчики не ломают kafka.

    Зачем тогда использовать kafka?


    1. Iktash
      20.07.2023 13:38
      +4

      Потому что мы все еще хотим предоставлять шину данных с гарантиями at least once. И она все еще должна уметь хорошо горизонтально масштабироваться, для того, чтобы быть общим ресурсом для огромного количества сервисов. И, конечно, поддерживать возможность горизонтального масштабирования самих сервисов.


    1. ewolf
      20.07.2023 13:38
      +6

      Основной профит в том, что скрыта технология под капотом. Сегодня кафка, завтра может быть редпанда, послезавтра - федерация кластеров. А для клиентов точки входа и апи не меняются


  1. ewolf
    20.07.2023 13:38
    +2

    Нагрузка немаленькая.

    А сколько реплик топиков используется и в каком режиме происходит продьюсинг (acks=0, acks=1, acks=all)?


    1. sgjurano Автор
      20.07.2023 13:38
      +1

      Все тесты проводились на acks=all с rf=3.


  1. barloc
    20.07.2023 13:38
    +11

    Хм, но статья про эту систему уже была на хабре полгодика назад. Правда было поменьше технички. Повторяетесь? :)

    Как известно, нет таких проблем в программировании, которые нельзя решить повышением уровня абстракции ???? В голову приходит идея: что, если большую часть логики реализовать на стороне сервера, полностью изолировать от потребителей работу с Kafka, а клиентский протокол упростить до неприличия? Тогда мы могли бы снова сделать наши библиотеки простыми, обновлять их редко, а клиенты бы получали новые фичи практически без доработок с их стороны. Да и уметь переезжать с одного кластера на другой было бы очень кстати.

    Протокол кафки достаточно давно стабилизирован, лет 5 уже точно. Вы не смогли централизовать обновление/использование стандартных библиотек, написали свою, по пути еще и создав хайлоад-сервис, который стал точкой отказа. И все равно клиенты должны будут обновлять библиотеки так или иначе.

    Мне кажется, или в этом "решении" технари победили менеджеров? :)

    Кстати про 75 брокеров кафки - а зачем их 75? У вас какая-то стандартизация брокеров? Просто в моем случае поток примерно в 1.5 млн входящих держало 3 брокера (но у вас конечно сообщения гораздо жирнее судя по трафику плюс большое число партиций).


    1. sgjurano Автор
      20.07.2023 13:38
      +4

      Одна из ключевых целей, которую мы преследуем — это снижение сложности клиентских библиотек.

      Библиотеки для работы через grpc гораздо проще чем библиотеки для работы с kafka, у нас более-менее активно используется 4 языка и на каждом свои либы с кучей подводных граблей, поддержка платформенных адаптеров вокруг них весьма болезненна.

      По поводу конфигурации кластера не подскажу, возможно смогут ответить коллеги.


      1. barloc
        20.07.2023 13:38
        +2

        Спасибо за ответ. Получается экономия ресурсов разработки за счет увеличения нагрузки на оперейшенс, да еще какого, аж на 5 млн рпс :)

        А не подскажите еще распределение на кафку по языкам? Типа на го приходится 50% продьюсеров/консьюмеров, на пайтон - 10% и т.д. ?


    1. Iktash
      20.07.2023 13:38
      +6

      По количеству брокеров. Во-первых, у нас нагрузка на которую рассчитан кластер в rps не полтора миллиона, а на порядок больше. По трафику так же нужно смотреть — как правило пишут большими батчами, а не по одному сообщению. Это наша общая рекомендация для сервисов. Размеры батчей до 50Мб доходят. Во-вторых, у нас все рассчитано на падения одного из трех ДЦ, в которых расположена Кафка. Третий момент — нужно смотреть конфигурацию, например у нас довольно много ресурсов заложено на TLS, на работу внутренних экспортеров. Ну и последний момент, число 75 условное.

      Про повторы. Вероятно, вы путаете нашу статью с аналогичной от Авито. У них похожий подход и они пишут о нем периодически, например, тут
      https://habr.com/ru/companies/avito/articles/655553/

      Не скрою, мы вдохновлялись этими материалами, прежде чем разрабатывать свою систему.


      1. barloc
        20.07.2023 13:38

        Спасибо за ответ :)

        Точно, извиняюсь, вот та статья https://habr.com/ru/companies/avito/articles/726564/

        Названия у басов одинаковые, спутал :)

        Третий момент — нужно смотреть конфигурацию, например у нас довольно много ресурсов заложено на TLS, на работу внутренних экспортеров

        А, вот оно что. Никогда не использовал у себя, слишком уж расточительно. Было бы интересно почитать статью, насколько большое потребление процессора в случае кафки. Но кажется, что не должно быть очень уж большим, самая ресурсоемкая операция же происходит в момент хендшейка, дальше все гораздо проще. А у кафки соединения постоянные и таких хендшейков должно быть довольно мало.


  1. zcolleen
    20.07.2023 13:38
    +4

    Вы упоминаете в при паблише сообщения клиент на каждое сообщение получает акк о его успешной записи. При этом есть параметр ProducerLinger который накапливает какое-то количество сообщений до публикации. Получается что акк клиенту приходит до того как мы записали батч в кафку? Что будет происходить если сервис умрет в этот момент?


    1. sgjurano Автор
      20.07.2023 13:38

      Клиент получит свой ack только после того как батч будет записан на брокеры.

      ProducerLinger влияет на время накопления сообщений перед тем как они будут записаны, всё это время клиент ждёт подтверждения.

      Если же клиент умрёт и сообщение в этот момент будет записано, то он просто запишет его ещё раз после рестарта — дублирование сообщений допустимо при at least once.


  1. dph
    20.07.2023 13:38
    +5

    А откуда в Ozon появляется такое количество сообщений?
    Насколько я понимаю, нагрузка на сайте там единицы тысяч rps, да и то на чтение и в пике. Откуда появляются миллионы сообщений в кафке?


    1. sgjurano Автор
      20.07.2023 13:38

      Нагрузка на гейтвее скорее ближе к сотне тысяч rps, а дальше микросервисная архитектура делает своё дело — у нас сейчас около 4 тысяч сервисов.


      1. dph
        20.07.2023 13:38
        +1

        А откуда на apigw сотня тысяч rps? Или каждая страница превращается в сотни rps?

        Да и это про чтение же, а чтение не должно приводить к сообщениям в кафке?
        А действий - ещё на порядки меньше.
        Т.е. все равно не понятно, как сотня (даже меньше) покупок в секунду приводит к к полутора миллионам новых сообщений в кафке в секунду.


        1. sgjurano Автор
          20.07.2023 13:38
          -4

          Вы оперируете интересными числами неизвестного происхождения в своих предположениях :)

          Не думаю, что я смогу комплексно ответить на ваш вопрос за пределами уже сказанного.


          1. dph
            20.07.2023 13:38
            +9

            Хм. Меньше сотни платежей в секунду - это данные из квартального отчета Озона за первый квартал 2023 года (там 180млн. платежей за квартал указано, что примерно 50 в секунду)
            1.5 миллиона сообщений - из приведенного в данной статье графика.
            Понятно, что не только платежи приводят к трафику - но все равно интересно, откуда такой объем сообщений в кафке. Я бы сказал, порядка на три-четыре больше ожидаемого.


            1. denis-isaev
              20.07.2023 13:38
              +2

              Как известно, нет таких задач в программировании, в которых нет ничего, что можно было бы положить в кафку.

              Почему чтение не должно приводить к генерации сообщений в message bus? А как же аналитика, маркетинговые программы, и прочие штуки, следящие за каждым вашим чихом на сайте?


              1. dph
                20.07.2023 13:38

                Ну, можно, конечно, каждое движение мышкой отправлять в kafka, но это довольно странная идея, там проще (и правильнее) отправлять сразу пачку данных по сессии. Т.е. вряд ли именно эти данные дают такое количество сообщений.


                1. vedenin1980
                  20.07.2023 13:38
                  +3

                  Судя по всему тут переборщили с микро-микро-сервисами, в результате один запрос от пользователя порождает тысячи сообщений между микросервисами. Так иногда бывает.
                  То есть круто, что так оптимизировали Кафку, но вот остальная архитектура вызывает вопросы.


                  1. dph
                    20.07.2023 13:38
                    +1

                    Ну, 5000 микросервисов - это точно перебор, тут даже нет сомнений.
                    Но все равно не понятно, зачем внутри все взаимодействие через kafka, хотя скорее всего никакого смысла в асинхронности нет.


                    1. arturgspb
                      20.07.2023 13:38

                      А почему вы считаете, что 5000 перебор? А сколько надо по вашему и почему? Вы же не знаете какие там подсистемы, бизнес-логика, компромиссы, на которые когда-то пошли при каких-то обстоятельствах и пр. Да и вроде бы речь не идет про то, что Кафка только про обработку событий на сайте от клиентов. Наверняка там какие-то синхронизации данных между подсистемами, попутные подсчеты счетчиков и пр. и пр.

                      И даже если представить, что есть какая-то часть неоптимальностей с микросервисами, то вы же не будете сейчас рекомендовать ребятам переписать все микросервисы, перестроить процессы и пр. Это тоже денег стоит, да и статья же конкретная и про другое вообще, а вы уводите в то, что как вам кажется, все это вообще не нужно и вы бы не так там с самого начала делали.


                  1. murkin-kot
                    20.07.2023 13:38
                    +2

                    Они не кафку, они самодельные прокладки для общения с ней допиливали. Кафка и без них миллионы может делать, правда очень маленьких сообщений.


            1. Evald24
              20.07.2023 13:38
              +2

              Так Ozon это же не только покупки и показ товаров, а куча ещё всего, например логистика, тревел, финтех... Наверное, не обращали внимания =)
              Поищите сайт OzonTech или Youtube канал, там много интересного


              1. dph
                20.07.2023 13:38
                +1

                Это понятно, но там довольно мало сообщений идет относительно основного бизнеса. Не сотни тысяч в секунду )
                Ну, конечно, если в кафку летят ежесекундные треки по всем курьерам, то там вполне получится десяток тысяч rps, но даже это - не миллионы )


    1. sgjurano Автор
      20.07.2023 13:38
      +1

      Посмотрел крупнейших потребителей - в топе рекламная сеть, аналитика и система защиты от ботов.


      1. dph
        20.07.2023 13:38

        Спасибо. Все равно не понятно, откуда там столько событий. Неужели каждый показ рекламы отправляют в кафку? А уж какие события формирует аналитика - я вообще не представляю. Или там 1000 шагов ETL и все обмениваются через kafka?


        1. sgjurano Автор
          20.07.2023 13:38

          Для подобных систем в целом валидно отправлять каждое событие в кафку, другое дело, что эти отправки стоит батчить, чтобы не создавать чрезмерную нагрузку на брокеры.

          Потребность управлять поведением клиентов - как раз одна из основных причин создания data-bus, там зачастую что-то нездоровое происходит.


    1. murkin-kot
      20.07.2023 13:38
      +2

      Похоже, что ребята купили начальство вау-фактором. Миллионы! Это же много! При этом вопрос "зачем" просто не обсуждался. Хотя можно предположить, что в качестве оправдания будет предложено что-то про расширение бизнеса. На сколько тысяч процентов они закладываются при таком расширении, история умалчивает.


  1. miga
    20.07.2023 13:38
    +1

    Хе-хе, в понедельник я анонсировал (внутри конторы) GA своего сервиса-прокси кафки с грпц стримингом и протобафом, а сейчас эта статья.

    Только у меня консьюмеры забирают сообщения из промежуточного батча в прокси, и коммитит оффсеты она же.


    1. sgjurano Автор
      20.07.2023 13:38

      Тогда возможно вам будет интересно ознакомиться с проектом https://github.com/mailgun/kafka-pixy, хоть он и не имеет полноценной поддержки (последний релиз был в 2019), но как минимум из него можно черпать вдохновение :)


      1. miga
        20.07.2023 13:38
        +1

        Спасибо за ссылку. Посмотрел мельком на апи - лонг поллинг для грпц, как по мне, странная идея, если есть стримы.

        И, что интересно, я специально искал все возможные варианты подобных опенсорсных штук прежде чем пилить свою - и не нашел вот это проект, хотя ключевые слова абсолютно совпадают.


  1. Hixon10
    20.07.2023 13:38
    +2

    Интересно, не было ли проблем с GC GO у вас? Или вы пулы объектов используете, на горячем пути практически ничего не аллоцируете?


    И традиционный вопрос в подобных постах — при дизайне системы не думали о том, чтобы взять язык без GC (C++/Rust)?


    1. sgjurano Автор
      20.07.2023 13:38
      +1

      Я в процессе отладки не раз подозревал GC, но ни разу он не был виновен - трейсинг хорошо позволяет отслеживать такое. На горячем пути в grpc и franz-go действительно в основном пулы объектов под капотом.

      Идея про С++/Rust расcматривалась скорее в полушуточном режиме, поскольку мы не упирались в язык, а в команде экспертизы по Go заметно больше :)


  1. manyakRus
    20.07.2023 13:38

    А не легче было поменять Kafka на NATS ?
    производительность выросла бы в 2-3 раза ничего не делая :-)


    1. sgjurano Автор
      20.07.2023 13:38

      NATS к сожалению обеспечивает лишь гарантии at most once в безброкерном режиме, а с брокером смысл его использования теряется. Кроме того задача переезда на него гораздо тяжелее чем задача переезда на grpc-прокси перед kafka без переезда данных.

      Ну и проблемы с библиотеками от этого никуда не деваются :)


  1. event1
    20.07.2023 13:38
    +3

    Самый простой из доступных вариантов — это TCP. Однако он достаточно низкоуровневый, в частности heartbeat нам придётся реализовывать самостоятельно

    В TCP есть такая штука, как keepalive. Настраивается параметрами сокета TCP_KEEP*, либо через sysctl.


    1. sgjurano Автор
      20.07.2023 13:38

      Хорошее замечание, спасибо.


  1. sadko4u
    20.07.2023 13:38

    Опять эти мифические RPS для измерения нагрузки...

    5 млн RPS с обслуживанием в 10 микросекунд на запрос и те же 5 млн RPS с обслуживанием по минуте на запрос - это совершенно разная нагрузка, отличающаяся на порядки.

    Нагрузку мерят либо в часозанятиях, либо в эрлангах.


    1. sgjurano Автор
      20.07.2023 13:38

      Безусловно вы правы.

      Выбор оптимизируемой метрики определяется целью — в нашем случае нужно было держать нагрузку на продовый кластер без чрезмерного оверхеда.

      Поскольку кафка занимается в основном перекладыванием байтиков, то и оптимизировали мы именно его :)