Работа в Яндексе ставит огромное количество технических вызовов, которые интересно решать. И один из них — это производительность основной очереди сообщений YDB Topics. Посмотрим, может ли опенсорсный продукт от Яндекса конкурировать с Apache Kafka по производительности.

Меня зовут Зевайкин Александр. Я уже практически 20 лет в IT. Начинал с преподавания в вузе, запускал множество стартапов, руководил командами разработки. В текущий момент я работаю в Яндексе в отделе, занимающимся одной из высоконагруженных систем. Это разработка распределённой базы YDB, в частности, функциональности YDB Topic.

Архитектурные особенности YDB

Для начала расскажу про архитектурные особенности, что такое YDB Topics, и как всё это работает внутри.

Если упростить, YDB Topics — это большая и производительная потоковая очередь сообщений, на которой работает весь Яндекс.

Изначально YDB — это распределённая отказоустойчивая Distributed SQL база данных с открытым исходным кодом. Она сочетает в себе свойства классических баз данных, такие как SQL запросы и ACID транзакции, с достоинствами NoSQL систем, такими как масштабируемость, высокая доступность, и отказоустойчивость.

В настоящий момент YDB уже полноценная платформа., Она состоит из большого количества готовых блоков, которые дополняют друг друга с архитектурной точки зрения. Очереди сообщений, они же топики — это типичный блок, который органично встраивается в эту платформу и переиспользует готовые сложные механизмы. Например, распределённое отказоустойчивое хранилище.

Несколько слов об инсталляции YDB Topics в Яндексе. Суммарно мы прокачиваем через нашу очередь сообщений порядка 20 миллионов событий в секунду скоростью до 200 гигабайт в секунду. Это 80 гигабайт в секунду на запись и 120 на чтение. А в пик нагрузок мы прокачиваем через очередь до 300 гигабайт в секунду.

Наибольший объём потребляют логи Яндекс Метрики. И, конечно, это шина для асинхронного взаимодействия прикладных сервисов, которыми у нас занимается порядка 1000 разных команд.

При этом в каждый момент времени над сервисом дежурит лишь один SRE. Ему не сложно справляться, так как автоматика на основе мониторинга доведена до такого уровня, когда дежурный просто не замечает выход из строя диска, сервера, стойки. В асинхронном режиме создаётся тикет на починку или замену, который уходит, например, в конкретный дата‑центр. Максимум, дежурному придёт уведомление в Telegram о том, на что стоит обратить внимание. Очередь сообщений при этом продолжит работать как ни в чём не бывало.

Архитектура очереди сообщений

Согласно определению, очередь сообщения служит для асинхронного взаимодействия между писателями и читателями.

Потоковая очередь сообщений состоит из топиков — семантической единицы группировки сообщений. 

Kafka

Если посмотреть на архитектуру Kafka, она хранит данные непосредственно на узлах обработки.

Получив запрос на запись, Kafka сначала пишет данные в локальную файловую систему, а потом реплицирует на другие узлы. И, наоборот, получив запрос на чтение, Kafka отдаёт данные именно с локальной файловой системы или из страничного кэша, если данные ещё свежие. То есть при типичном чтении, мы не используем сеть ведомых узлов.

Pulsar

Apache Pulsar — это более свежий аналог Kafka. Он довольно развит с точки зрения функциональности и имеет ряд интересных архитектурных особенностей. Главное — то, что он выделяет отдельный слой хранения, полагаясь на такой проект, как Apache BookKeeper. Получив запрос на запись или чтение, Pulsar обращается не к локальной файловой системе, а к выделенным узлам хранения BookKeeper. И это основное конкурентное преимущество по сравнению с Kafka, так как выделенный слой хранения существенно упрощает масштабирование и отказоустойчивость. Становится возможным независимо изменять число брокеров Pulsar и узлов хранения Bookeeper.

YDB Topics

Как и в Pulsar, в YDB Topics также выделен отдельный слой хранения, но он полагается уже на платформу YDB.

Получив запрос на чтение/запись, брокер, обслуживающий конкретную партицию, обращается к так называемому BlobStorage. Это подситсема реализует отказоустойчивое распределённое хранилище. Аналогично Pulsar данные свойства выгодно отличают YDB Topics от Kafka с точки зрения масштабируемости и отказоустойчивости.

Наиболее отказоустойчивый режим хранения в YDB — Mirror-3-dcприменяется для трёхдатацентровых кластеров.

Он поддерживает продолжение штатной работы при отказе одной зоны доступности целиком. Плюс одной серверной стойки в любой другой зоне. Также есть такая замечательная особенность, как автоматическая реконфигурация групп хранения, которая снижает вероятность потерь данных путём восстановления избыточности.

Например, если вышел из строя диск, сервер, либо стойка, автоматика это обнаруживает, и через час пытается восстановить требуемую трёхкратную избыточность без непосредственного участия дежурного. Совокупность этих гарантий приводит к тому, что у Kafka и Pulsar нет аналогичных режимов.

Их мы будем сравнивать в режиме блок 4–2, который у нас применяется для однодатацентровых кластеров.

Данный режим использует Erasure кодирование. Он потребляет всего 1,5х места при гарантиях доступности, сравнимых с трёхкратной репликацией Kafka и Pulsar.

Почему не Kafka в 2017?

Сначала у нас и была Kafka. У нас даже есть статья на Хабре от 2014 года, где мы рассказывали, как передавали все логи на базе Kafka. Но в итоге мы отказались от этого решения в 2017 году.

ZooKeeper

Kafka требовала для своей работы отдельный сервис ZooKeeper. Этот распределённый сервис координации хранит всю метаинформацию Kafka. В частности, Kafka хранит в ZooKeeper смещение (offset) всех читателей. А это часто изменяющаяся информация.

Из‑за этого при высоком потоке на запись, например, 5–10 миллионов событий в секунду, ZooKeeper не справлялся с сохранением всех смещений на масштабах Яндекса. Либо, что для нас очень важно, когда число партиций достигало десятков тысяч, ZooKeeper с этим тоже не справляется. Поэтому для нужд Яндекса понадобилось бы несколько инсталляций. И в каждой их пришлось бы поддерживать ещё по своему кластеру ZooKeeper.

Геораспределённый кластер

По состоянию на начало 2017 года в Apache Kafka не было поддержки зеркалирования и геораспределённости. Только в 2017 году появились первые предложения, как это сделать. С 2013 года по 2017 год мы были вынуждены сами строить обёртки поверх Kafka так, чтобы работать в режиме геораспределённого кластера.

Отсутствие exactly-once

Изначально Kafka поддерживала только гарантию at least once. Это гарантировало доставка сообщения, но не отсутствие дублей. Дело в том, что в протоколе Kafka на тот момент не было счётчика сообщений при записи. Поэтому мы не могли сделать дедупликацию на стороне Kafka. В результате нам приходилось заводить собственные счётчики, писать их в сообщения или их метаданные, а на стороне конечного потребителя производить дедупликацию. Так мы перекладывали заботы о корректной обработке сообщений на сторону прикладного приложения вместо инфраструктуры.

Отсутствие квот и гибкого разграничения доступа

Кроме того, мог появиться «шумный сосед» — потребитель, который использует всё процессорное время или пропускную способность кластера на запись. Это неприемлемо ни для внутренних команд Яндекса, ни для работы в режиме публичного облака. Первые предложения Kafka по разграничению доступа и добавлению квот появились только в 2018 году.

Почему не Kafka в 2023?

Это привело к тому, что в 2017 году мы были вынуждены принять решение отказаться от Kafka и перейти на платформу YDB, где уже многое было решено «из коробки». Стоит заметить, что в 2017 году потоки на запись составляли порядка 1 гигабайта в секунду, а к настоящему моменту они выросли более чем в 100 раз.

ZooKeeper

Kafka начинает планомерно отказываться от ZooKeeper в пользу собственной реализации сервиса координации под названием KRaft. Но окончательный переход у них запланирован на 2024 год. И на их сайте есть раздел limitations and known issues, в котором есть восемь крупных ограничений для применения в промышленной эксплуатации.

Геораспределённый кластер

Kafka поддержала георепликацию в упрощенном режиме — федерация. То есть запись идет в один локальный дата‑центр, и в асинхронном режиме специальные процессы реплицируют данные в другие дата‑центры.

У данного режима есть существенные недостатки. Запись идет только в локальный дата‑центр, нет подтверждения о записи в другие дата‑центры. Из‑за чего невозможна поддержка exactly once.

Если с локальным дата‑центром случится какая‑то авария, например кабель перерубили, в тот момент, когда мы начали писать в него данные… всё, что туда было записано будет недоступно на неограниченное время. Например, на сутки. В худшем случае вообще потеряется.

YDB Topics поддерживает как федерацию, так и полноценный режим геораспределённого кластера с гарантиями exactly once. Благодаря использованию слоя хранения YDB, геораспределённый кластер также выдерживает одновременный отказ одного дата-центра и одной стойки в другом дата-центре.

Ограниченный exactly once

Не так давно в Kafka появился флажок под названием «enable idempotence». Он включает порядковый счетчик сообщений на уровне сессии между писателем и брокером. Таким образом, брокер может производить дедупликацию сообщений, то есть гарантирует «exactly once». Но сессия может быть перезапущена, например из‑за сбоя писателя или брокера. В этом случае счетчик сбрасывается в ноль и невозможно понять статус доставки последних сообщений. Вроде есть «exactly once», а вроде его и нет.

Ограниченный мониторинг

У Kafka огромное количество метрик мониторинга. Она проинтегрирована с Zabbix и Prometheus. Но в Kafka нет метрик мониторинга по клиентам, то есть по писателям и читателям. Метрики мониторинга ограничены уровнем брокеров или топиков. Для нас это не подходит при работе в режиме serverless. Нам необходимо разделять наших клиентов в облаке.

Почему именно YDB Platform?

Типичный брокер очередей, такой как Kafka, при записи объединяет множество клиентских сообщений в пачки, которые потом отправляются в файловую систему. При записи в файловую систему они именуются. При чтении, наоборот, из файловой системы читается пачка сообщений, которые потом отдаются клиенту. Если абстрагироваться от файлов, то этот интерфейс можно заменить на «ключ‑значение», где ключом является уникальный номер пачки, а значением множество сообщений.

В платформе YDB распределенное, быстрое Key‑Value хранилище. Поэтому мы решили построить поверх него очередь сообщений.

При записи свежие сообщения объединяются в пачку, на базе смещения формируется ключ, и эта пара отправляется в Key‑Value хранилище. При чтении, наоборот.

Первое нагрузочное тестирование

После реализации основной функциональности, решение заработало на объёмах Яндекса. Параллельно мы начали проводить серию экспериментов по нагрузочному тестированию. 

При планировании нагрузочного тестирования, нужно составить методику проведения экспериментов. Первый пункт — это стенд, на котором мы сравниваемся. В интернете много статей по нагрузочному тестированию Kafka и Pulsar. Но практически все они построены на обычных виртуальных машинах, даже хуже — на облачных виртуальных машинах.

Для нас такие сравнения не репрезентативны, потому что мы работаем на довольно мощных серверах, которые по своим свойствам отличаются от облачных виртуальных машин. Поэтому данное сравнение мы проводили на серверах, аналогичных используемым в production кластерах. Мы собрали кластер из восьми таких машин и нагружали его. На нём по очереди устанавливали кластеры Kafka, Pulsar, YDB Topics и сравнивали их.

Мы измеряли большое количество показателей. Системные показатели важнее для понимания узких мест. Но сравниваем мы именно по прикладным показателям: скорость чтения/записи, полное время между записью и чтением.

В ходе экспериментов мы использовали штатные утилиты, имитации нагрузки:

Так как утилиты немного разные, мы выдвигали базовые требования ко всем сценариям:

  • писатели и читатели запускаются на 8 серверах одновременно;

  • не должен расти лаг чтения;

  • фактор репликации равен 3 (в YDB сравнимый режим работы — block-4–2);

  • подтверждение записи ожидается от всех брокеров;

  • сжатие сообщений отключено.

Сами сценарии следующие:

  • максимальная скорость — максимизируется скорость, невзирая на задержки.

  • минимальное время (end‑to‑end, от генерации до вычитывания) — минимизируется 50 процентиль времени.

Первые результаты по скорости дали понять, что мы отстаём от аналогов. Kafka, например, на нашем кластере показывала 20 мегабит в секунду, Pulsar — 16, а мы на тот момент — всего 13.

Это нас и огорчило, но и дало информацию, что именно улучшать.

Пристальное изучение показало, что мы упираемся во внутреннюю сеть кластера. Pulsar отстаёт от Kafka по той же причине. Ведь у него вынесенный слой хранения, а Kafka записывает большинство данных локально. Pulsar активнее использует сеть кластера, поэтому она раньше упирается в порог.

Первые результаты по времени тоже были неутешительные:

Без нагрузки: 100 Кбит/с.

Под нагрузкой: 6,4 Гбит/с (порядка 1/3 от максимальной).

Внутреннее расследование показало, что мы упираемся в процессор. У нас есть избыточное копирование внутри программного кода. У YDB из‑за универсальности платформы внутри активно используются protobuf, а их сериализация и десериализация тоже нас замедляет.

Когда мы поняли узкие места, приступили к оптимизации.

Правильный генератор нагрузки 

У нас в платформе YDB у каждой подсистемы есть специальные генераторы нагрузки для KeyValue, OLTP‑таблиц и OLAP‑таблиц. И даже отдельный генератор для распределённого слоя хранения.

Поэтому мы также написали генератор для топиков, который равномерно с постоянной скоростью нагружает кластер. А также получили важные метрики мониторинга, чтобы понять, что же у нас происходит.

Прямая запись в партицию

На уровне SDK входной точкой пользователя является прокси, которое существенно упрощает разработку клиентских SDK, так как им необходимо поддерживать всего одно соединение с кластером. За прокси могут быть тысячи партиций, даже десятки тысяч партиций, но SDK устанавливает всего одно соединение с кластером, и оно не знает об этих десятках тысяч партиций.

Это работает для обычных OLTP‑таблиц, когда мы отправляем сравнительно небольшой SQL‑запрос и получаем ограниченное количество строчек результата. В этом случае нет нагрузки на сеть, нет нагрузки на процессор в части прокси и в части SDK.

Но такой подход создаёт большую нагрузку для брокера сообщений. Ведь это, по сути, «труба», через которую идёт трафик. Каждое дополнительное взаимодействие по сети стоит ресурсов. Поэтому мы решили это убрать.

Когда SDK обращается к прокси, оно спрашивает, где находится интересующая её партиция. А прокси отвечает,что партиция находится на таком‑то узле, где партиция может работать напрямую. SDK устанавливает отдельное соединение напрямую уже с нужной партицией. Таким образом, мы экономим одно сетевое соединение, что позволяет снизить нагрузку на внутреннюю сеть кластера. И заодно мы экономим процессор, так как на этом этапе убраны лишние сериализации/десериализации protobuf.

Профилирование: флеймграф

Мы много времени провели в профайлере, анализируя флеймграфы. Подавали нагрузку, запускали профилировщики на серверах, снимали флеймграф, смотрели, где находятся подозрительные крупные столбики, и разбирались с ними. Оказалось, что под ними находился ряд лишних копирований.

Разберём типичный пример узкого места, которое хорошо видно визуально.

По стеку вызовов видно, что внутри функции TBatch::Serialize много времени тратится на лишнее копирование строки.

Мы пошли в программный код и увидели, что в return есть три конкатенации строки. Это привело к лишним копированиям строк, вызовам конструкторов/деструкторов.

Мы применили самую типичную С++ оптимизацию: выделили память под строку заранее, передали готовую строку в этот метод и уже в эту строку писали.

Таким образом, мы устранили лишнее копирование и, как следствие, столбик на флеймграфе. В результате сильно сэкономили процессорное время.

И таких столбиков мы устранили несколько.

Erasure кодирование

Важная архитектурная особенность YDB — Erasure Encoding. Запись происходит на 6 дисков. На первые 4 диска пишутся данные, на оставшиеся 2 диска — контрольные суммы. Это позволяет сильно экономить хранимые и передаваемые данные при неизменных гарантиях доступности.

Алгоритмы широко известны, но мы ускорили его реализацию на уровне программного кода.

Раньше была универсальная реализация алгоритма — параметры кодирования были произвольные и данные обрабатывались в обычном цикле. В ходе оптимизации алгоритм заточили под block-4–2, то есть под конкретные параметры кодирования. Алгоритм векторизовали, все циклы развернули. Видя такой код, компилятор смог подобрать подходящие SIMD инструкции автоматически.

Результат: существенное ускорения кодирования/декодирования.

После проведенных оптимизаций мы решили повторить нагрузочное тестирование.

Обновлённое нагрузочное тестирование

Результаты нас очень порадовали. Максимальная скорость увеличилась в два раза, и мы превзошли в данном тесте аналоги. 

Выявился и другой сопутствующий результат из‑за применения Erasure Encoding — экономия места на дисках. Время хранения сообщений (retention) означает, что брокер гарантирует, что данные будут доступны целые сутки, независимо от их вычитывания. Большой retention требует большого объёма дисков.

Допустим, мы фиксируем retention в сутки и у нас есть какие‑то конкретные диски. Мы можем подавать их на кластер с определённой скоростью и проверять, что все сообщения за сутки ещё доступны. Но если мы превысим эту скорость, то диски переполнятся, и мы не сможем гарантировать прежний retention.

Результаты нас тоже порадовали. Kafka может подавать на данный кластер 177 мегабайт в секунду, а мы можем подавать на данный кластер 280 мегабайт в секунду. Отличие в полтора раза. Если вспомнить слайд с масштабами Яндекса, то данная экономия на дисках в 1.5 раза существенная.

Регрессионное нагрузочное тестирование

Попутно мы настроили регрессионное нагрузочное тестирование. Каждые четыре часа мы его запускаем в Kubernetes, отгружаем графики в DataLens и проверяем, нет ли ухудшений.

Вот наглядный пример, что у нас произошло летом. Был очередной коммит, который существенно изменил нашу производительность. Мы это увидели, поправили и даже сделали лучше.

Что не было сделано

Мы ещё не добрались до минимизации полного времени передачи сообщений. Учитывая асинхронную передачу информации, у нас основной объём — это логи и метрики. Для нас в приоритете скорость, поэтому именно её мы оптимизировали.

Мы будем агрессивнее уплотнять сообщения в пачке, что сэкономит процессорное время. А ещё будем стараться там, где это возможно, избавляться от протобуфа. Например, в протобуф передавать только метаданные, а сами данные передавать отдельно. И это, например, позволит применить уже другие архитектурные оптимизации вроде Zero Copy, когда данные снимаются напрямую с сетевой карты и передаются в прикладной процесс, минуя операционную систему.

Kafka API

Многие привыкли к Kafka, и мы принимаем это как стандарт в области очередей сообщений. Поэтому добавили Kafka API поверх YDB Topics. При этом сохранили все архитектурные преимущества платформы YDB, в том числе повышенную производительность и надёжность. И пользователи могут взять Kafka SDK, например на Java, или любом другом языке программирования, родную утилиту генерации нагрузки для Kafka и работать с YDB Topics по протоколу Kafka.

Важно, что в Yandex Cloud это доступно и в режиме Serverless — стратегии организации облачных услуг, при котором облако автоматически
и динамически управляет выделением ресурсов в зависимости от нагрузки.

То есть вы можете взять так называемый Free Tier, бесплатный небольшой топик по протоколу Kafka, который для вас не будет стоить абсолютно ничего. Но если вдруг вы резко вырастите до тысяч партиций, с вашей стороны не потребуется никаких дополнительных действий, ведь внутри платформы YDB всё отмасштабируется само собой.

Итого

YDB Topics — 7 лет в проде Яндекса как основная очередь сообщений.

Доступны в OpenSource и в Yandex Cloud в режиме serverless/dedicated как «Yandex Data Streams».

Мы внесли множество как архитектурных, так и низкоуровневых правок, в результате подтянули производительность до уровня аналогов и даже местами обогнали.

Комментарии (4)


  1. kmatveev
    27.06.2024 20:36
    +1

    это основное конкурентное преимущество по сравнению с Kafka, так как выделенный слой хранения существенно упрощает масштабирование и отказоустойчивость

    Нет никакой связи между выделенностью и упрощением

    Когда SDK обращается к прокси, оно спрашивает, где находится интересующая её партиция. А SDK отвечает,что партиция находится на таком‑то узле

    Наверное, хотели написать "А прокси отвечает". И вообще оно тут уже не прокси, ибо перестаёт пропускать трафик сквозь себя, это редиректор.

    По стеку вызовов видно, что внутри функции TBatch::Serialize много времени тратится на лишнее копирование строки

    по стеку не видно, что оно лишнее


    1. ShuraZ Автор
      27.06.2024 20:36
      +1

      Выделенный слой хранения дает возможным независимо изменять число брокеров Pulsar и узлов хранения Bookeeper. Если повышенная вычислительная нагрузка на брокеры, то можно увеличить число узлов Pulsar. Если не хватает места на диске для требуемых гарантий времени хранения (retention), то можно увеличить число узлов Bookeeper. В Kafka такой гибкости не хватает, так как хранение данных производится непосредственно на брокерах. Добавил в тексте.

      Действительно, "прокси отвечает", поправил, спасибо.

      В результатах профайлера видно функцию TBatch::Serialize, внутри которой есть std::copy и memmove. Именно на это мы обратили внимание и пошли смотреть исходный код. А там уже нашли и убрали лишние склеивания строк.


      1. kmatveev
        27.06.2024 20:36

        Наверное, из моего предыдущего комментария не очень понятно, что моя основная претензия к статье - её излишняя краткость, из-за чего она изобилует громкими утверждениями, которым не хватает обоснований.

        По комментарию: не могли бы вы, пожалуйста, рассказать, какого рода вычислительная нагрузка на брокерах, что она может быть "повышенной"? Я бы предположил (в случае с Kafka), что они пишут байты на диск и отсылают байты на другие узлы кластера, то есть упираются в диск и сеть. При этом чем принципиально отличается добавление узлов Bookeeper от добавления узла Kafka? И там, и там нужна железка и процесс.

        И ещё, если у вас есть возможность дополнять статью, не могли бы вы развернуть самый важный, и при этом самый короткий пункт "Почему именно YDB Platform". Нифига не понятно, зачем KeyValue. Вот для файла-журнала есть отличная и простая модель: непрерывный файл, сообщение адресуется смещением в этом файле, при последовательном чтении сообщений мы автоматически получаем смещение каждого следующего сообщения и возвращаем его. Про ваше KeyValue ничего не понятно: что является ключом, что является значением, хранится в памяти или на диске и в каком виде.


        1. ShuraZ Автор
          27.06.2024 20:36
          +1

          какого рода вычислительная нагрузка на брокерах, что она может быть "повышенной"

          Брокер сообщений выполняет множество вычислительно сложных операций: общение с писателями/читателями (терминация SSL, аутентификация, прием/отправка сообщений, подтверждение сообщений, буферизация, дедупликация), общение с другими брокерами (репликация), и на конец - запись на диск.

          чем принципиально отличается добавление узлов Bookeeper от добавления узла Kafka

          Добавление брокера в Kafka сильно сложнее, чем в Pulsar. Основная причина: данные партиций лежат прямо на брокерах и при увеличении числа брокеров данные нужно перебалансировать. Процесс переноса партиций между брокерами в Kafka называется partition reassignment и требует полного копирования данных затрагиваемых партиций.

          что является ключом, что является значением, хранится в памяти или на диске и в каком виде.


          В статье есть:
          Ключом является уникальный номер пачки, а значением множество сообщений.
          При записи свежие сообщения объединяются в пачку, на базе смещения формируется ключ, и эта пара отправляется в Key‑Value хранилище. При чтении, наоборот.

          Ряд дополнительных подробностей про YBD Topics можно найти тут https://ydb.tech/docs/ru/concepts/topic