10 лет назад сотни серверов Яндекса работали на Apache Kafka®, но в этом продукте нам нравилось далеко не всё. Наши задачи требовали единой шины для передачи всех видов данных: от биллинговых до журналов приложений. Сегодня объёмы достигли уже десятков тысяч именованных наборов сообщений.

При таком количестве данных в Apache Kafka® становилось сложно управлять правами доступа, организовывать распределённую работу нескольких команд и многое другое. Проблемы роста и отсутствие подходящего решения в открытом доступе привели к тому, что мы разработали своё решение YDB Topics и выложили его в опенсорс в составе платформы данных YDB. В этом посте расскажу о предпосылках создания продукта, нашей архитектуре передачи данных, возникающих задачах и возможностях, которые появились вместе с YDB Topics.

Зачем нам собственная шина

Вот лишь несколько трудностей, которые возникали при использовании Apache Kafka®:

  1. Отсутствие квот. Команде нельзя было выделить маленький кусочек кластера. Были возможны только два решения: большой кластер, что дорого, или общий кластер на несколько команд, что возможностями Kafka не очень поддерживалось. Сами кластеры вели себя не очень стабильно: иногда мы наблюдали потерю некоторых частей передаваемых данных.

  2. Запрет пользователям прямого доступа к общим кластерам. Из‑за этого целой команде сисадминов приходилось разгребать заявки по настройке прав, которые пользователи оставляли в Yandex Tracker.

В первую очередь эти неудобства и подтолкнули нас задуматься о собственной шине передачи данных. К тому моменту платформа YDB позволяла надёжно хранить данные и в ней уже были реализованы такие механизмы, как распределённый консенсус и failover. Так что мы решили воспользоваться готовыми технологиями и просто создать собственную шину на основе YDB — так и появился продукт YDB Topics. Мы надеялись, что если такое решение будет общедоступным, это привлечёт сообщество к совместному развитию экосистемы обработки данных. Поэтому решили выложить его в опенсорс и помочь закрыть потребности крупных компаний и гиперскейлеров. Покажу, для каких задач это может быть актуально.

Архитектура передачи данных в Яндексе

Архитектура передачи данных в Яндексе строится вокруг трёх типовых сценариев:

  1. Передача данных между приложениями, которые размещены в нескольких дата‑центрах (ДЦ). Приложения сами генерируют трафик и сами координируют нагрузку. При этом приложения знают, что дата‑центров несколько, что дата‑центры могут отказывать, и учитывают это в своей работе. При работе наш модуль управления следит за нагрузкой и может попросить клиента перебалансировать поток между различными ДЦ.

  2. Передача биллинговых данных. Такие данные нельзя терять, дублировать или менять их порядок (например, в банковских системах важна последовательность пополнения и списания средств со счетов). Пользователи передают данные биллинга в шину передачи, а мы реплицируем их по трём зонам доступности в рамках единого кросс‑дата‑центрового кластера. Такая система выдерживает одновременное отключение целого дата‑центра и серверной стойки в другом ДЦ и гарантирует высокую доступность чтения и записи данных.

  1. Передача журналов работы приложений и данных в реальном времени. Это почти то же самое, что и передача данных приложений, только гарантии скорости передачи данных в реальном времени ниже.

Сценарий, когда пользователи знают, с каким дата‑центром происходит работа, называется «федерация»: это объединение отдельных дата‑центров. Федерация значительно дешевле кросс‑ДЦ: для хранения используется Erasure‑кодирование, аналогичное кодам Reed — Solomon, а не полная репликация данных. А так как не нужно гарантировать порядок данных при чтении между дата‑центрами, допускаются дубли и ниже доступность для чтения, к чему приложения изначально готовы. При отключении одного дата‑центра трафик распределяется между остальными.

Что такое коды Reed — Solomon? Если у вас четыре блока данных, вы можете добавить к ним два блока контрольной суммы и пережить остановку любых двух узлов этой системы. В отличие от репликации в режиме кросс‑ДЦ, при котором мы хранили три копии данных, в случае Erasure‑кодирования мы храним данные с избыточностью всего с коэффициентом 1,5, и это очень значительно на наших объёмах данных. Такая система гарантирует устойчивость: даже если два из шести узлов откажут, она продолжает работать, и пользователи ничего не замечают. В зависимости от конфигурации мы либо даём полные гарантии exactly‑once с порядком, высокой доступностью и транзакционной обработкой, либо гарантии пониже, которые точно позволят доставлять ваши данные, но могут приводить к редким дублям, плюс иногда немножко снижается доступность для чтения. При этом SLA обеих систем в любом случае будет высоким в рамках их моделей отказов.

Чем здесь полезно решение YDB Topics

Шина данных YDB Topics умеет передавать разнородную информацию с высокой гарантией сохранности. Развернуть её можно где угодно: на кластерах Kubernetes®, виртуальных машинах и в Docker‑контейнере для разработчиков. YDB Topics входит в состав опенсорсной платформы данных YDB — катастрофоустойчивой и масштабируемой на тысячи узлов базы данных.

Вот как выглядит шина данных в цифрах:

Этим огромным потоком данных может управлять всего один дежурный SRE‑специалист. Расскажу чуть подробнее.

Архитектура YDB Topics, Kafka и Pulsar

В сообществе активно используются шины данных Apache Kafka®, Apache Pulsar™, так что стоит верхнеуровнево сравнить их архитектуру и архитектуру YDB Topics.

В Apache Kafka® и Apache Pulsar™ есть модули управления, которые объясняют пользователям, с каким из серверов нужно работать. Зачастую в этой роли используется система типа Apache ZooKeeper™, которая отвечает за выбор лидера, а затем поступившие данные переносятся на узел хранения с помощью внутренних механизмов репликации.

Apache Kafka® работает именно по такой схеме: ZooKeeper™ выбирает лидера, и данные реплицируются.

Apache Pulsar™ использует другой подход: хранение данных здесь реализовано с помощью отдельного сервиса Apache BookKeeper. Pulsar™ передает в него данные, просит их надёжно разместить, а рядом использует серверы ZooKeeper™, которые помогают Pulsar™ выбирать, какой из серверов будет лидером, и обеспечивают отказоустойчивость.

YDB Topics чем‑то похожа на эти системы, а чем‑то отличается. Мы тоже храним данные не локально, а в отдельной системе, которая называется YDB BlobStorage. Этот компонент принимает потоки данных и надёжно размещает их на узлах хранения в зависимости от доступности дата‑центров, серверов и узлов, информацией о которых он полностью владеет. BlobStorage обрабатывает выход узлов из строя — обеспечивает постоянную доступность данных. В зависимости от настроек он может либо хранить данные в трёх ДЦ, выполняя полные копии, либо с помощью Erasure‑кодирования уменьшать объём и физически хранить только полтора объёма данных.

Свойство

YDB Topics

Kafka®

Pulsar™

Способ хранения

Выделенный, YDB BlobStorage

Локальное хранение данных

Выделенный, Apache BookKeeper

Способ репликации и коэффициент хранения

Block-4-2, 1,5x
Mirror-3dc, 3x

Репликация, 3x

Репликация, 3x

Особенности хранения данных в YDB Topics:

  • Минимально разрешённое время хранения в инсталляциях YDB Topics в Яндексе — 18 часов.

  • Данные самых критичных сервисов хранятся 36+ часов.

  • Основной ограничивающий фактор — объём дисков. Мы перешли с жёстких дисков на NVME, выиграли по числу I/O‑операций, что позволило уменьшить количество серверов с тысячи до нескольких сотен, но объёма дисков стало не хватать.

  • Общий объём хранения: федерация — более 20 ПБ, кросс‑ДЦ — более 1 ПБ.

Стандартный кластер в YDB Topics состоит из нескольких сотен гетерогенных хостов, в каждом из которых:

  • 56 ядер;

  • 256 ГБ памяти;

  • 2 HDD (система, логи);

  • 4 NVME (данные).

Это типовая конфигурация: некоторые серверы мощнее, другие слабее. Поскольку мы храним системный софт и логи отдельно от данных, то можем определить количество доступных ядер и объём свободного места и перебалансировать систему. Более мощным серверам мы доверяем больше вычислительной нагрузки, на серверах с большим количеством дисков храним больше данных.

А вообще в самых больших кластерах YDB‑платформы больше 10 тысяч узлов. Следить за всем силами одного дежурного SRE‑специалиста удаётся в основном за счёт того, что все операции взаимодействия с сущностями платформы YDB переданы самим пользователям. В случае топиков пользователи сами управляют всем нужным для работы: заводят топики и удаляют их, меняют настройки и управляют ACL, получают новые мощности и отдают старые. При этом они могут делать это в любой момент, даже если команда YDB Topics в это же время выкатывает новый релиз на тот же кластер. В других сервисах платформы YDB тоже все операции отданы пользователям. Именно это позволяет нам фокусироваться на других задачах.

Протоколы, безопасность и мониторинг

Опенсорсное решение поддерживает собственный протокол YDB Topics и Apache Kafka®, а решение для Yandex Cloud — ещё и протокол Amazon Kinesis.

Все данные внутри Яндекса передаются в зашифрованном виде. Мы проводим регулярные аудиты безопасности, проверяем качество кода. Отслеживаем состояние системы и управляем ею с помощью веб‑консоли и посредством SQL‑запросов.

Что дальше

В наших планах:

  1. Оптимизация скорости и сетевого обмена.

  2. Схематизация данных. Исторически в Яндексе передавали и обрабатывали бинарные массивы данных, но с ростом компании стало сложнее понимать, какие данные где обрабатываются. Вот почему мы движемся в сторону каталогизации данных.

  3. Увеличение поддержки Apache Kafka® API. Уже сейчас данные можно поставлять и считывать по протоколу Apache Kafka®, проделана большая работа по интеграции с Kafka Connect, где мы поддерживаем работу в Kafka Connect standalone-режиме. При этом мы активно занимаемся увеличением степени интеграции и планируем поддержать ещё и работу через KSQL.

Мы открыты предложениям! Если у вас появятся идеи, как улучшить YDB Topics, пожалуйста, приходите и делитесь: создавайте feature requests в Issues на Github или пишите нам в чат community.

Комментарии (23)


  1. yegreS
    02.05.2024 06:40
    +1

    В рамках одной транзакции можно записать сообщение и в таблицу и в топик?


    1. SloNN Автор
      02.05.2024 06:40
      +3

      Мы туда идем. Сделали транзакционное чтение из топиков и запись в таблицы (сценарий процессинга данных из топиков), сейчас пишем код по транзакционной записи в топики (сценарий стриминга изменений наружу).

      А у вас в каком сценарии нужно писать и в таблицу и в топик в одной транзакции?


      1. yegreS
        02.05.2024 06:40
        +1

        Смотрю в сторону упрощения паттерна Transactional outbox.
        Изменили сущность в БД, гарантировано отправили доменное событие в шину


        1. alexfilus
          02.05.2024 06:40
          +1

          Насколько я понял эти же топики используются для репликации, соответственно можно ваше приложение как реплику подключить и гарантировано получить доменное событие. Подключаться можно как по пропиетарному протоколу, так и используя kafka-клиент.


        1. Andrey72
          02.05.2024 06:40
          +1

          CDC не рассматривали как вариант? Вроде бы делает то что вы ожидаете


  1. vadimr
    02.05.2024 06:40
    +1

    • 2 HDD (система, логи); 

    • 4 NVME (данные).

    А из каких соображений система находится на HDD, а не на NVME?


    1. starik-2005
      02.05.2024 06:40
      +2

      Имха, система и софт грузятся один раз в память, потом это условно работает несколько миллиардов лет, пока не помрет или не перезагрузится. Зачем там НВМЕ? Для логов 100Мб/с (если это бюджетные диски, но что-то сомневаюсь) потоковой записи вполне должно хватить. Не?

      В настоящее время скорость физических операций на дисках HDD достигает 200–300 МБ/с, а на дисках SSD — 600–700 МБ/с. Интерфейс имеет более высокую пропускную способность, что позволяет контроллеру кэшировать и буферизировать обрабатываемые данные. (гугл нарисовал на вопрос)

      А рандом там особо не нужен...


      1. vadimr
        02.05.2024 06:40

        У меня есть сомнения, что линукс (если это он) грузится один раз в память. Если только они не разворачивают виртуальный ram-диск. Но вроде бы как память сервера можно использовать более производительно.


        1. aozeritsky
          02.05.2024 06:40

          Там нужно по сути ядро + один процесс. Уверен, что процесс лочится в памяти с помощью mlock и больше не выгружается на диск.


          1. vadimr
            02.05.2024 06:40

            Если ограничиться одним монолитным процессом, что трудно представимо для сложной системы. Скрипты, в частности, не использовать.


            1. splav_asv
              02.05.2024 06:40
              +3

              Скрипты в Яндексе тоже любят собирать в монобинарь(по крайней мере Питон).


            1. Sap_ru
              02.05.2024 06:40

              Линукс не дурак и отлично мапит в память все файлы, к которым обращался. И на самом деле система действительно грузится один раз, дальше могут какие-то вспомогательные скрипты или команды загружаться, но нет ничего страшного если они будут аж 100 мс грузиться. Самое главное, что HDD надёжнее NVME в таких сценариях, реже требует замены и сильно дешевле. В масштабах Яндекса должно прямо заметно выходить.


      1. SloNN Автор
        02.05.2024 06:40
        +5

        Да, все так. Плюс технически не во все сервера возможна установка 6 NVME, а хочется работать плюс-минус на commodity серверах


      1. onyxmaster
        02.05.2024 06:40
        +4

        Я с посылом не спорю, но отмечу что 600-700МБ/сек для SSD это устаревшая информация. Диски для PCIe 5.0 умеют больше 13000МБ/сек.


        1. starik-2005
          02.05.2024 06:40
          +1

          Ну в этой цитате гугла смысл был больше в том, что ХДД тоже не так уж и плохи при последовательном доступе. Они сильно проигрывают по IOPS, что для логов - текстовый поток данных - малоактуально.


          1. onyxmaster
            02.05.2024 06:40
            +2

            Я именно потому и написал, что с посылом не спорю, просто уточняю некоторые фактические неточности в самой цитате.


  1. mynameco
    02.05.2024 06:40

    Del


  1. domix32
    02.05.2024 06:40
    +20

    А™зачем® так™ много® трейдмарок™? Вы же их не продаёте тут.


  1. gurovofficial
    02.05.2024 06:40

    У меня на прошлой работе, тоже гигантские обьемы были и я предлагал на митапе тоже самое - отдельную шину данных на аппаратном уровне, но меня никто не поддержал. Аргумент против - что не истратили полностью ресурсы оптимизации. И вот Яндекс это реализовал. А могли бы мы сами сделать и с Яндексом поделиться (уже готовой технологией). Почему никто не смотрит в будущее?


    1. akakoychenko
      02.05.2024 06:40
      +6

      А могли бы мы сами сделать и с Яндексом поделиться (уже готовой технологией)

      Оно ему зачем? Или вы б сделали нечто, что 100500 звезд и форков на гитлабе имело бы? А, если нет, то, зачем брать чужой код, где ещё хз, сколько багов и сколько под себя допиливать?


  1. Loggus66
    02.05.2024 06:40
    +4

    Если выпивать 50 грамм каждый раз, когда в тексте встречается

    Kafka®

    , то можно неплохо надраться.


    1. xkb45bkc4
      02.05.2024 06:40

      750 грамм в одно лицо как никак.


  1. unsweet
    02.05.2024 06:40
    +1

    Зачастую в этой роли используется система типа Apache ZooKeeper™, которая отвечает за выбор лидера, а затем поступившие данные переносятся на узел хранения с помощью внутренних механизмов репликации.

    ZooKeeper еще в версии 2.8 заменили на Metadata Quorum