Привет! Меня зовут Паша Сушин. Уже больше десяти лет я занимаюсь в Яндексе развитием платформы YTsaurus — нашего внутреннего инструмента, который в марте 2023 года вышел в опенсорс и теперь доступен всем на GitHub по лицензии Apache 2.0.

Сегодня мой рассказ будет о том, какие ограничения архитектуры мы преодолели, чтобы масштабировать наши кластеры больше чем в сотню раз.

Краткое знакомство с YTsaurus

YTsaurus — Data LakeHouse‑платформа. Различные сервисы Яндекса заливают в неё структурированные, полуструктурированные и неструктурированные данные, чтобы их хранить и обрабатывать. Например, Поиск заливает в YTsaurus HTML‑документы, картинки и видео, а потом строит по ним поисковую базу. Многие сервисы хранят в YTsaurus логи, которые нужны для аналитики, дашбордов и отчётов. А на GPU‑картах, подключённых к кластеру YTsaurus, наши ML‑инженеры обучают нейронные сети.

В Яндексе развёрнуто несколько крупных инсталляций YTsaurus. Самый большой кластер состоит:

  • более 20 тысяч хостов;

  • свыше миллиона CPU‑ядер;

  • HDD‑дисков с суммарным объёмом больше эксабайта, не считая SSD с NVMe.

Чтобы объяснить, что умеет YTsaurus, проще всего провести аналогии с экосистемой Hadoop.

Что у платформы внутри

Если посмотреть на архитектуру YTsaurus, то тоже можно увидеть параллели с архитектурой Hadoop.

Scheduler в YTsaurus выполняет примерно ту же функцию, что и YARN в Hadoop — распределение вычислительных ресурсов между пользователями. На этих ресурсах запускаются различные инструменты для обработки данных, например MapReduce, поверх которого существует SQL‑язык для написания запросов, по аналогии с Hive или Pig в Hadoop.

Другие инструменты обработки данных — это интеграция YTsaurus со Spark под названием SPYT и интеграция с ClickHouse под названием CHYT, которая занимает ту же нишу, что Impala, Presto или Trino в экосистеме Hadoop: быстрые in‑memory ad‑hoc‑запросы.

Инсталляцию YTsaurus обычно называют кластер. Вот его основные компоненты:

Nodes, или ноды, — основные рабочие лошадки, где мы храним данные и выполняем вычисления. Schedulers занимаются распределением ресурсов. Proxies — API и точка входа в кластер. И, наконец, центральный компонент кластера YTsaurus — Primary и Secondary Masters, или просто мастеры.

Кто такие мастеры и чем они занимаются

Проще всего объяснить на примерах.

Мастер: Кипарис 

Слой хранения платформы организован иерархически и представляет из себя дерево, похожее на любое дерево файловой системы.

Создадим каталог /home/project с узлом типа document и поместим туда небольшой структурированный документ.

$ yt create map_node //home/project 
1875d-233bf-3ff012f-cb8d073b

$ yt create document //home/project/document \ 
--attributes '{value={key=value; list=[0;1;2]}}' 
1875d-1d400-3ff01a5-d52d61a3

Чтобы узнать, сколько узлов лежит внутри каталога /home/project, прочитаем его атрибут @count:

$ yt get //home/project/@count
1

Схема дерева, которое у нас получилось: 

В YTsaurus это дерево называется Кипарис, и оно целиком находится в памяти мастеров. Обработка запросов к Кипарису — одна из главных функций мастера.

Мастер: чанки

Попробуем разобраться с более сложным типом узлов Кипариса — со статическими таблицами. Создадим таблицу и выполним две команды заливки данных.

$ yt create table //home/project/table --attributes'
{schema = [{name=c1; type=string}; {name=c2; type=int64}]}'
1875d-888e7-3ff0191-ef35d066

$ echo '{c1=a; c2=1};{c1=a; c2=2}' | \
yt write-table //home/project/table --format yson
$ echo '{c1=b; 3};{c1=c; c2=5}' | \
yt write-table '<append=%true>//home/project/table’ --format yson

Каждая команда заливки данных порождает один чанк.

$ yt get //home/project/table/@chunk_ids
[
"7271 70117 fb70064 4046a85b";
"7271 70be4 fb70064 2911f30";
]

Чанки хранятся в нескольких репликах на нодах кластера. Но именно мастер знает, из каких чанков состоит каждая таблица и на каких нодах хранятся реплики этих чанков. Более того, мастер занимается поддержкой необходимого коэффициента репликации для каждого чанка.

Мастер: репликатор

Чтобы обеспечить нужный коэффициент репликации, ноды ходят к мастерам с периодическими rpc‑вызовами двух типов:

  • Incremental heartbeats. В них нода присылает мастеру информацию о том, какие реплики чанков добавились и удалились с момента последнего incremental heartbeat.

  • Job heartbeats. С их помощью нода рассказывает мастеру, сколько у неё свободных ресурсов, и получает в ответ задание, какие чанки надо отреплицировать, а какие реплики — удалить.

Так мастеры могут поддерживать коэффициент репликации для каждого чанка.

Думаю, вы уже поняли, что у мастеров очень много функций: управление репликацией, обслуживание запросов к Кипарису, а также менеджмент транзакций, управление правами доступа, квотирование ресурсов, — и я затронул только вершину айсберга.

В этом месте у пытливого читателя должен возникнуть вопрос: если мастер — центральная точка, через которую проходит практически каждое взаимодействие пользователя с кластером, каким образом удаётся масштабировать кластеры по числу пользователей и нод так, чтобы мастер не стал узким местом?

Требования к мастеру YTsaurus

С момента релиза наши кластеры выросли на два десятичных порядка. Естественно, это не было ни легко, ни бесплатно; более того, работа продолжается до сих пор.

Когда мы только начинали проектировать YTsaurus, у нас была система предыдущего поколения и опыт её эксплуатации в Яндексе. Другими словами, мы неплохо представляли, что именно хотим получить.

Наши требования нельзя назвать неожиданными: мы хотели сделать отказоустойчивую платформу. А значит, как бы это ни было сложно, нам предстояло добиться реплицированности и строгой консистентности.

Любой студент, который посещал хороший курс по распределённым системам, знает: отказоустойчивую и линеаризуемую систему можно построить с помощью классического подхода State Machine Replication. Он появился ещё в 80-х годах, но наибольшую популярность приобрёл после выхода статьи про протокол Raft.

Чтобы сделать сервис линеаризуемым и отказоустойчивым:

  1. Имплементируйте его в виде конечного автомата.

  2. Конечный автомат посадите внутрь библиотеки, которая реализует протокол Raft.

  3. Запустите несколько реплик сервиса на разных машинах, чтобы любой запрос к сервису происходил по схеме:

    • клиент посылает запрос к лидеру, который выберет библиотека Raft →

    • лидер с помощью Raft‑магии записывает запрос в лог запросов на, как минимум, кворуме участников →

    • библиотека Raft передаёт запрос внутрь конечного автомата →

    • конечный автомат исполняет запрос →

    • клиент получает ответ.

Казалось бы, самое сложное — разобраться с библиотекой Raft, а в конечном автомате ничего хитрого нет. Но если ваш сервис чуть сложнее хеш‑таблицы, то написать для него конечный автомат — совершенно нетривиальная задача. Но прежде чем я это продемонстрирую, давайте обсудим, что такое мутации: мы очень активно используем это понятие в коде YTsaurus.

Всё о мутациях

Что это такое

В YTsaurus мы называем:

  • мутацией — один переход конечного автомата из состояния A в состояние B;

  • описанием мутации — клиентский запрос, который инициировал мутацию;

  • обработчиком мутации — код, который меняет состояние конечного автомата.

То есть любой переход конечного автомата между состояниями вызван некоторой мутацией (у которой есть описание) и исполняется некоторым обработчиком (кодом, который эту мутацию реализует).

В качестве примера простой мутации в системе, подобной YTsaurus, покажу создание нового узла Кипариса.

Описание мутации довольно очевидное:

  • пользователь, который инициировал мутацию;

  • путь к узлу, который мы хотим создать;

  • тип узла.

Тогда код обработчика мутации будет выглядеть вот так:

  1. Понять, есть ли родительский узел, внутри которого мы будем создавать новый. Если родительского узла нет, то запрос некорректный, никакого изменения состояния не случится, а клиенту должна вернуться ошибка.

  2. Проверить, есть ли у пользователя права, чтобы создать новый узел в текущем каталоге.

  3. Если с правами всё в порядке, проверить, есть ли у пользователя квоты на создание ещё одного узла.

  4. Только после прохождения всех проверок можно изменить состояние конечного автомата. Нужно создать новый узел, подцепить его к родителю и обновить использование квоты.

Весь этот код выполняется на каждой реплике мастера. И чтобы состояние на всех мастерах оставалось синхронным и одинаковым, мы должны гарантировать, что, будучи выполненным даже в разные моменты времени, этот код приведёт автомат из состояния A в состояние B абсолютно одинаково.

Почему я говорю про разные моменты времени? Во‑первых, мы имеем дело с распределённой системой, где время не синхронизировано: изменение состояния на лидере и фолловерах произойдёт не мгновенно.

Во‑вторых, в любой системе случаются сбои. После того как какой‑то узел даст сбой, Raft начнёт один за другим вызывать обработчики мутаций из changelog, пока не догонит состояние лидера. То есть возможна ситуация, когда на лидере мутация уже выполнилась, а на каком‑нибудь из его фолловеров она случится только через несколько минут или даже часов.

Недетерминированные мутации = data loss

Чтобы понять, насколько важно абсолютно одинаковое выполнение мутаций на разных репликах, проведём мысленный эксперимент.

Предположим, что CheckPermissions вызывает внешний сервис, а права доступа хранятся не внутри YTsaurus, а во внешней базе данных. Вызовы внешней БД могут выполняться в разные моменты времени, особенно если произойдёт восстановление. Поскольку мы не контролируем внешнюю БД, эти вызовы могут привести к разным результатам.

А если вызов породит два разных результата, мы пройдём проверку прав доступа только в одном случае из двух: на одной реплике узел появится, а на другой — нет.

Думаю, вы уже догадались, чем это чревато. Если состояния на репликах разойдутся и в какой‑то момент в системе переключится лидер, произойдёт потеря данных. Тот узел, запись которого мы ранее подтвердили клиенту, исчезнет как по мановению волшебной палочки. Отсюда следует фундаментальное правило мутаций: мутации должны быть детерминированными. В обработчике мутаций можно использовать только текущее состояние автомата и описание мутации.

Мутации: да или нет

Проверьте себя, попробуйте ответить на вопросы.  

Можно ли взять внутри мутации текущее значение времени, чтобы проверить, не пора ли отменить запрос по таймауту? 

Нет. Когда вы исполняете мутацию первый раз, запрос может успешно выполниться, но, если потом проиграть мутацию при восстановлении через несколько часов, таймаут, скорее всего, уже наступит, мутация не применится, реплики разойдутся. 

Можно ли выполнять мутации конкурентно в разных потоках?

Тоже нет. Представьте себе пул тредов, в которые попадают мутации из библиотеки Raft, когда вы хотите их применить. Предположим, клиент сделал два запроса: на создание и на удаление узла. Оба запроса прошли через Raft, записались на кворум и попадают в конечный автомат, где начинают применяться в двух потоках конкурентно.

К каким проблемам это может привести? На одной реплике запрос на создание мог выполниться до запроса на удаление; оба запроса пройдут успешно, и мы окажемся в ситуации, когда узла не существует. 

На второй реплике — наоборот: запрос на удаление мог выполниться раньше, чем запрос на создание. Тогда в ответ на удаление придёт ошибка, а создание выполнится. 

И снова мы получим различные состояния реплик: не детерминированный конечный автомат, а непонятно что. Если хотя бы в одной мутации есть расхождение, то дальше оно будет только нарастать и в итоге может привести к большой беде. 

Но даже если вы справились, корректно и аккуратно написали весь код мутаций, это не спасёт вас от неочевидных последствий.

Неочевидные последствия и итоги

Падение в обработчике мутации будет воспроизводиться до вмешательства разработчика. Представьте: из‑за бага вы разыменовали нулевой указатель в обработке мутации и получили ожидаемый segmentation fault. Что произойдёт дальше?

Из‑за того, что всё детерминировано, segfault случится, скорее всего, на всех репликах. Но система отказоустойчива. Процессы поднимутся, начнут зачитывать лог мутаций, потом — заново детерминированно применять всё те же мутации, переводить автомат из состояния в состояние, пока не дойдут до той самой мутации, что вызвала падение.

И они обязательно дойдут: ведь мутация попала в лог до того, как применилась к состоянию автомата. На этой мутации всё снова рухнет, вместе с концепцией отказоустойчивого сервиса. Падения будут продолжаться по кругу до тех пор, пока не придёт разработчик и не выложит хотфикс.

Корректное роллинг‑обновление возможно далеко не всегда. Другая неочевидная проблема — обновлять систему будет очень сложно. Предположим, вы хотите выложить новый релиз, в котором изменилась логика некоторых мутаций.

Если вы попытаетесь выкатить изменение логики постепенно, роллингом, то может пройти следующее: вы обновите одну реплику, а потом придёт мутация и выполнится на старых репликах старым кодом, а на новой реплике — соответственно, новым. И вот мы снова пришли к расхождению.

Подведём итоги:

  • мутации могут быть только детерминированными;

  • для консистентности исполнять мутации можно только последовательно. Переводя на язык кода — только в одном потоке.

Чтения из RSM

Чтения не меняют состояние автомата, их не нужно проводить через библиотеку Raft или записывать в changelog: получили запрос — обслужили его — отдали ответ клиенту. Но с точки зрения масштабирования придётся ответить на пару вопросов.

Можем ли мы читать конкурентно с тем, как идут мутации? Да, если мы готовы пожертвовать консистентностью.

Из кода видно, что мутация может обновить несколько разных частей состояния конечного автомата. Если между этими обновлениями вклинится чтение, то его результат может оказаться неконсистентным.

Если вам важна линеаризуемость системы, придётся упорядочивать чтение вместе с мутациями: обслуживать его из того же потока, который применяет мутации.

Можно ли читать с фолловеров? Тоже только с оговорками. Несмотря на то, что система реплицирована и реплики совершенно идентичны, читать с фолловеров опасно, потому что они могут отставать. В тот момент, когда клиент получает подтверждение записи, гарантируется только, что она применилась на лидере, а на фолловере, с которого вы хотите читать, — нет. В итоге получится stale read: вы не увидите того, что сами записали.

Как всё-таки масштабироваться без ограничений

Вернёмся к архитектуре YTsaurus на заре платформы. Когда мы впервые выходили в продакшн, то использовали классический RSM со всеми ограничениями, описанными в предыдущих разделах:

  • мы были вынуждены ограничить все чтения и записи одним потоком;

  • чтение выполняли всегда на лидере, чтобы не было stale read;

  • хранили в памяти всё состояние автомата.

Последнее — требование не RSM, но производительности: мы и так вынуждены все операции с автоматом делать последовательно из одного потока; если мы будем при этом ходить на диск, это будет недопустимо долго.

Небольшой бонус состоит в том, что нет смысла защищать состояние с помощью каких‑то примитивов синхронизации, таких как spin lock, mutex или condition variable. У нас есть возможность обращаться к состоянию автомата без блокировок.

Мало того, что мастер фактически пропускает через себя все запросы к системе, так ещё и вся производительность упирается в единственный поток на единственной машине. Как же расширить это узкое место?

Кешировать

Выше я долго рассказывал, как полезно и важно делать консистентные системы. Но в какой‑то момент стоит признать: не все запросы требуют консистентности. И на этом можно очень сильно сэкономить за счёт простейшего подхода — кеширования.

И наши клиенты, и некоторые внутренние компоненты кластера используют Кипарис как сервис координации и источник конфигурации. Ноды периодически запрашивают у мастера его конфигурацию, одинаковую для всех нод. А нод у нас много, они могут создать внушительную нагрузку на мастер.

Если конфигурация устареет на несколько секунд, ничего страшного с нодами не произойдёт. Поэтому мы используем кеши.

Это очень эффективный способ, но не универсальный: необходимо явно размечать те запросы, где допустима устаревшая информация.

На графике — объёмы трафика, который раздают наши мастеры и мастер-кеши. Легко заметить, что мастер-кеши загружены в 3–4 раза больше
На графике — объёмы трафика, который раздают наши мастеры и мастер-кеши. Легко заметить, что мастер-кеши загружены в 3–4 раза больше

Разгружать лидера

Вернёмся к чтению с фолловеров. В наивной имплементации это приводит к stale read, но, если вы готовы пожертвовать латентностью ответов на чтение в пользу распределения нагрузки, перед вами открываются новые опции.

Когда клиент делает запрос на фолловер, фолловер не отвечает сразу. Вместо этого он делает лёгкий запрос на лидер, чтобы понять, какая версия автомата сейчас актуальна.

Версия автомата на лидере — то есть та мутация, до которой уже докатился лидер, — это мутация, дальше которой клиент не мог ничего видеть. Потому что запрос, который послал клиент, случился до того, как на лидер пришёл запрос get leader version.

Если теперь фолловер просто подождёт, пока его локальное состояние докатится до текущего состояния лидера, то ответ, который он сможет дать клиенту, гарантированно не будет протухшим.

Таким образом, фолловер:

  • делает лёгкий запрос, который не нагружает тред автомата на лидере;

  • получает версию, которой нужно дождаться;

  • дожидается этой версии.

Потом запрос на чтение проваливается в тред автомата и выполняется, а клиент получает ответ.

Ценой небольшого ожидания мы можем существенно увеличить производительность чтения, потому что в группировке мастеров лидер всегда только один, а фолловеров — несколько. Это кратно увеличивает пропускную способность чтения.

На графике — нагрузка на тред автомата за исключением мутаций. Мы полностью сняли с лидера нагрузку на чтение, потому что на нём и так немало вспомогательных процессов
На графике — нагрузка на тред автомата за исключением мутаций. Мы полностью сняли с лидера нагрузку на чтение, потому что на нём и так немало вспомогательных процессов

Следующий шаг к разгрузке лидера — многопоточное чтение. Я уже рассказывал, почему нельзя делать чтение конкурентно записями. Но что, если бы мы могли позволить себе на какое‑то время приостановить записи на наших фолловерах?

На первый взгляд это не очевидно, но применение мутации на фолловере не мешает обслуживать пользовательские запросы: при посылке мутирующего запроса клиент получает ответ, как только мутация применится на лидере. То есть мутация должна записаться на кворум в changelog, после чего лидер применит её к себе и уже ответит клиенту.

  • Запросы на чтение ставим в очередь ожидания (они всё равно ждут SyncWithUpstream).

  • Выделяем кванты времени на чтение, в это время применение новых мутаций блокируется.

  • Читаем в несколько потоков, пока не закончится квант или запросы на чтение в очереди.

Это ещё сильнее увеличивает наши возможности по throughput чтения со всех мастеров.

На графиках тред-пул, который занимается чтениями, иногда пробивает 100% CPU 
На графиках тред-пул, который занимается чтениями, иногда пробивает 100% CPU 

Это означает, что мы обслуживаем больше запросов, чем мы могли бы, выполняйся чтения в один поток. Таким образом мы выжали ещё немного перфа на чтение из нашей группировки мастеров.

Шардировать состояние

Можно выжимать максимум из потока автомата, когда мы говорим про одну Raft‑группу. Кстати, Raft‑группу мастеров в YTsaurus мы называем cell (ячейкой).

Но, выжав максимум из одной группы, мы не разгрузим память (напоминаю, что всё состояние мастеров хранится в памяти). Если посмотреть на текущие размеры кластеров, то одних только метаданных про чанки, по грубым прикидкам, уже больше чем один терабайт.

Поэтому, когда наши кластеры впервые подобрались к размеру примерно в тысячу машин, мы стали думать о том, как шардировать это состояние. Первая идея, которая пришла нам в головы, — расселить менеджмент чанков на разные независимые cells, чтобы за каждый чанк отвечал один cell. Так вся работа с чанками проходила бы вне первичного мастера, существенно разгрузив тред его автомата.

Вот так могла бы выглядеть команда создания таблицы в такой системе:

Клиент посылает в первичный cell запрос на создание нового узла типа таблица. Происходит магия Raft, мутация попадает в тред автомата на первичном cell, где и создаётся узел таблицы. Но такой же узел таблицы должен создаться на вторичном cell, который будет хостить чанки этой таблицы.

Чтобы создать эту таблицу, было бы здорово вызвать RPC из первичного cell во вторичный. Но мы уже знаем, что внутри мутаций делать RPC‑вызовы нельзя: мутации могут проиграться заново и начать повторяться.

Значит, нужен какой‑то транспорт между cells, которым можно было бы пользоваться в рамках мутаций. И чтобы он гарантированно синхронно создавал узлы на первичном и вторичном cells. Мы имплементировали такой транспорт в YTsaurus и назвали его Hive.

Hive — очередь сообщений, которая хранится внутри автомата. Когда мы в рамках мутации создания таблицы хотим послать запрос на вторичный cell, запрос попадает в Hive‑очередь на отправку на первичном cell. А потом с помощью асинхронных RPC‑вызовов эти сообщения перекладываются в очередь на приёмку внутри состояния автомата в подсистеме Hive на вторичном cell. Когда сообщение получено (и все сообщения, которые ему предшествуют), вызывается мутация, которая обрабатывает его и удаляет из Hive. В результате мы получили надёжный one‑way exactly‑once транспорт между нашими конечными автоматами.

Покажу ещё один пример. Так выглядит заливка в таблицу в multicell‑конфигурации:

Клиент делает запрос на первичный cell: он собирается залить таблицу. Первичный cell отвечает ему, что таблица живёт на вторичном cell, и отправляет его туда. Одновременно с этим первичный cell через Hive посылает уведомление на вторичный, чтобы тот был готов к приходу клиента.

Пока уведомление летит, клиент может прийти на вторичный cell и сказать, что хочет создать чанк. Вторичный cell понимает, что до него, возможно, не успело долететь сообщение об открытии заливки в таблицу. Он делает быстрый RPC‑вызов на первичный cell, чтобы узнать, какое последнее сообщение было отослано по Hive, дождаться получения всех отосланных ранее сообщений и только после этого обрабатывать клиентский запрос.

Механика очень похожа на то, что мы делаем, когда читаем с лидера внутри одного cell. Даже механизмы у нас в коде называются одинаково — sync with upstream. Мы идём в upstream, чтобы понять, какая там версия, и потом дождаться её. Дожидаемся — выполняется мутация создания чанка, обрабатывается клиентский запрос, и ответ улетает обратно клиенту.

В какой‑то момент мы поняли, что и само дерево Кипариса стало слишком большим: нам нужно растить его дальше, но он создаёт нагрузку, которую очень тяжело потянуть одним потоком в одном cell. Пришлось учиться шардировать Кипарис.

У нас появились вторичные cells двух видов: чанковые вторичные cells, про которые я рассказывал выше, и портальные cells. Для работы с cells нового вида пришлось добавить узлы Кипариса нового типа, которые, по сути являются порталами в другой cell. Обращаясь к узлу от корня Кипариса, мы смотрим путь, понимаем, что мы попали в портал, и после этого запрос перенаправляется во вторичный cell.

Казалось бы, по‑прежнему первичный cell проводит первичный resolve, и всё равно нагрузка на него идёт.

Это действительно сложная конструкция и сложная подсистема, и я, к сожалению, уже не смогу о ней подробно рассказать. Если кратко, то на первичном мастере поддерживается специальный кеш, который мы стараемся держать синхронным с состоянием автомата. В нём написано, какие узлы представляют из себя порталы, и чтобы запрос провалился в портал, не требуется проходить через наш перегруженный тред автомата.

На самом деле внедрение системы порталов ещё происходит. Мы постепенно отселяем некоторые ветви нашего огромного дерева за порталы, а при необходимости добавляем и новые портальные cells. На данный момент на нашем самом большом кластере группировка мастеров состоит уже из 40 cells, и получается, что одних только мастеров на кластере уже более 150 штук. Эти 40 cells обслуживают 2 млрд чанков, 50 млн узлов Кипариса и более 4500 наших пользователей в Яндексе каждый день.

Но на этом история не заканчивается. Сейчас мы готовим следующее улучшение под кодовым названием «Секвойя». Думаю, что расскажем про него в одной из следующих статей.

Спасибо, что дочитали до конца. Если вы хотите узнать про YTsaurus ещё больше, пожалуйста, заходите на ytsaurus.tech.

Комментарии (1)


  1. slav1k
    21.07.2024 12:03

    А кто-нибудь пользуется этой системой, кроме Яндекса?