YTsaurus — платформа для распределённого хранения и обработки данных. С помощью неё пользователи могут производить вычисления с данными, которые хранятся на кластере. За запуск этих вычислений отвечает один из центральных компонентов системы — планировщик. Зачастую ресурсов кластера не хватает, чтобы одновременно запустить все желаемые вычислительные задачи. Поэтому одна из важных задач планировщика — умение грамотно распределять вычислительные ресурсы между пользователями.

Меня зовут Егор Щербин, я работаю в Yandex Infrastructure, в команде планировщика YTsaurus. О нём и расскажу в этой статье. А также о запуске вычислений в кластере YTsaurus, распределении ресурсов между вычислениями и о том, как управлять распределением, чтобы все операции получали ровно столько, сколько требуется.

Эта статья основана на моём вебинаре.

Обзор compute-подсистемы YTsaurus

Основной единицей запуска вычислений на кластере является операция. Обычно операция берёт входные данные из таблиц на кластере, обрабатывает их и сохраняет результаты в виде выходных таблиц на том же кластере. Вычисления в YTsaurus можно запускать на разных языках: Python, Java, C++, Go.

Операции делятся на джобы — атомарные части вычислений, которые запускаются в контейнере на ноде кластера и обрабатывают фрагменты входных данных.

У самого планировщика как центрального компонента есть три функции:

  1. Приём запросов на выполнение операций от пользователя.

  2. Общение с нодами кластера — они приходят к планировщику с хартбитами и сообщают о свободных ресурсах, а планировщик в ответ сообщает о новых джобах, которые необходимо запустить.

  3. Распределение ресурсов между операциями.

Как устроен интерфейс

Здесь и далее мы будем рассматривать работу планировщика, основываясь на примерах из Python Notebook (примеры в нём пронумерованы, и я буду ссылаться на эти номера). В кластере, на котором мы будем запускать данные примеры, доступно 24 CPU и 60 ГБ памяти.

Разберём простейшую операцию из примера № 1. Система принимает данные, «спит» минуту и пишет их на выход.

# 1. Running a simple operation.

# Prepare input.
yt.create("table", "//tmp/in", force=True)
yt.write_table("//tmp/in", [{"a": 1}])
          
# Run operation
yt.run_map("sleep 60; cat", "//tmp/in", "//tmp/out", format="yson")

# Read output.
for row in yt.read_table("//tmp/out"):
    print(row)

В интерфейсе планировщика можно посмотреть, когда была запущена операция и другие параметры. Здесь же доступны и список джобов, и статистики — числовые значения, которые собираются для каждого джоба на ноде и агрегируются в рамках всей операции. Набор статистик можно посмотреть в документации.

На странице статистик можно отсортировать данные — например, чтобы посмотреть, сколько памяти потребляет операция
На странице статистик можно отсортировать данные — например, чтобы посмотреть, сколько памяти потребляет операция

Наконец, есть вкладка спецификации. Здесь показано то, что вообще должна запускать операция: команда, входные таблицы, выходные таблицы и так далее. Полный список опций есть всё в той же документации.

Как происходит распределение ресурсов

Рассмотрим пример № 2, в котором запускается шесть простых операций. 

# 2. Running several operations.

tracker = yt.OperationsTracker(print_progress=False)
for i in range(6):
    op = yt.run_operation(
        yt.VanillaSpecBuilder()
            .begin_task("task")
                .command("sleep 1000")
                .job_count(3)
                .cpu_limit(2.0)
            .end_task(),
        sync=False,
    )
    tracker.add(op)

После запуска каждая из них требует по 6 CPU. В сумме это 36 CPU, но, как уже было сказано в начале, в кластере доступно только 24.

Когда планировщик занимается распределением ресурсов, он «думает» не в абсолютных значениях ресурсов (то есть не в CPU), а в долях кластера. Таким образом, каждая операция требует 0,25 от кластера, а суммарно они требуют 1,5.

В столбце Usage можно увидеть фактическое потребление — в нашем случае оно равно 0,16. То же значение и в соседнем столбце Fair Share (FS).

Всем операциям полагается одинаковое количество ресурсов
Всем операциям полагается одинаковое количество ресурсов

Fair Share (FS) — ключевая величина для процесса распределения ресурсов. Это количество ресурсов, которое, по мнению планировщика, полагается операции. Оно определяется специальным алгоритмом, после чего планировщик пытается запускать джобы так, чтобы реальное распределение совпадало с целевым.

В данном примере все операции равноправны и поэтому получают одинаковую долю ресурсов в кластере, то есть все получили по 0,16. В реальной работе FS постоянно меняется, ведь одни операции запускаются, а другие завершаются. Поэтому реальное потребление ресурсов операций, как правило, отстаёт от целевого, и задача планировщика — запускать джобы так, чтобы «догонять» целевое распределение.

Как управлять распределением ресурсов

У каждой операции есть вес — управляя им, можно управлять распределением, указывая, какие процессы важнее. В примере № 3 запускаются три операции с весами 1, 2 и 3, и FS у них уже не равный.

# 3. Running several operations with different weights.

tracker = yt.OperationsTracker(print_progress=False)
for i in range(3):
    op = yt.run_operation(
        yt.VanillaSpecBuilder()
            .begin_task("task")
                .command("sleep 1000")
                .job_count(12)
            .end_task()
            .weight(i + 1),
        sync=False,
    )
    tracker.add(op)
Вес каждой операции показан в столбце правее от id операции
Вес каждой операции показан в столбце правее от id операции

За квотирование ресурсов отвечают вычислительные пулы, которые выстраиваются в иерархическую структуру — дерево пулов. Каждая операция находится в некотором пуле, который указывается при её запуске (пример № 4).

# 4. Running several operations in different pools.

tracker = yt.OperationsTracker(print_progress=False)
for pool in ["a", "b"]:
    for i in range(3):
        op = yt.run_operation(
            yt.VanillaSpecBuilder()
                .begin_task("task")
                    .command("sleep 1000")
                    .job_count(12)
                .end_task()
                .pool(pool)
                .weight(i + 1), 
            sync=False,
        )
        tracker.add(op)

Алгоритм распределения ресурсов можно представить следующим образом: сначала ресурсы кластера делятся между пулами на первом уровне иерархии пропорционально весам. Дальше в каждом пуле полученные им ресурсы делятся между его дочерними пулами аналогичным образом и так далее, вплоть до самих операций.

Вычислительные пулы представлены специальными объектами в дереве метаинформации Кипарис. Чтобы создать пул, нужно нажать на кнопку Create pool в интерфейсе либо воспользоваться стандартной командой create в Python API (пример № 5).

# 5. Creating a pool.

yt.create("scheduler_pool", attributes={"name": "название пула", "parent_name": "имя предка", "pool_tree": "название дерева пулов"})

Если при запуске операции явно не указать пул, то в качестве названия будет использовано имя пользователя, запускающего операцию. В случае если в кластере нет персонального пула, система создаст его, но будет считать эфемерным. Он удалится, как только завершится последняя операция в пуле.

Квоты, гарантии и лимиты

Дерево пулов позволяет гибко управлять распределением ресурсов между разными операциями. Например, зачастую структура дерева пулов отражает иерархическую структуру компании, чтобы отделить процессы разных подразделений друг от друга. Также вычисления бывают разных типов (ad‑hoc‑аналитика, production, testing), и их также можно разделить, создав по своему пулу для каждого типа.

В то время как веса позволяют регулировать относительную приоритетность пулов, в компаниях всё же обычно оперируют понятием квот на вычислительные ресурсы. Например, команде аналитики по бюджету положено X CPU, а подразделению разработки — Y CPU. Такая возможность также поддерживается в YTsaurus. Для каждого пула можно настроить гарантию на ресурсы (пример № 6).

# 6. Setting pool strong guarantees.

# (pool_path, cpu_guarantee)
pool_params = [
    ("company", 24.0),
    ("company/development", 20.0),
    ("company/development/production", 16.0),
    ("company/development/testing", 4.0),
    ("company/analytics", 4.0),
    ("company/analytics/chyt", 4.0),
]

for pool_path, cpu_guarantee in pool_params:
    yt.set("//sys/pool_trees/default/" + pool_path + "/@strong_guarantee_resources", {"cpu": cpu_guarantee})

Эти гарантии устроены иерархически: сперва гарантия выдаётся родительскому пулу, а затем их можно распределять между дочерними. Соответственно, сумма гарантий по всем дочерним пулам не может превышать родительскую.

При распределении FS пула между его детьми планировщик сначала удовлетворяет все их гарантии. Остаток распределяется пропорционально весам. В частности, такой алгоритм обеспечивает следующее свойство: если какой‑то пул требует меньше ресурсов, чем его гарантия, то неиспользованный остаток гарантии в первую очередь раздаётся его соседям. Если же среди пулов нет желающих получить ресурсы, они «уходят» выше по дереву.

# 7. Running operations in pools with guarantees.

# (job_count, pool)
operation_params = [
    (4, "production"),
    (4, "chyt"),
    (12, "production"),
    (8, "testing"),
    (4, "bi"),
]

ops = []
for job_count, pool in operation_params:
    op = run_sleeping_operation(job_count=job_count, pool=pool)
    ops.append(op)
Пулы без гарантий не получили требуемых ресурсов (пример № 7)
Пулы без гарантий не получили требуемых ресурсов (пример № 7)

В дополнение к гарантиям существуют лимиты — верхние границы на количество ресурсов (пример № 8).

# 8. Resource limits.

yt.set("//sys/pool_trees/default/company/development/testing/@resource_limits", {"cpu": 5.0})

Лимиты приоритетнее гарантий: если на пуле установлен лимит 3 CPU, а гарантия 4 CPU, операция, запущенная в данном пуле, получит не больше 3 CPU.

Учёт нескольких видов ресурсов

Планировщик поддерживает пять типов ресурсов:

  1. CPU

  2. RAM

  3. GPU

  4. User Slots

  5. Network

CPU, RAM и GPU соответствуют физическим ресурсам на нодах кластера, в то время как User Slots — это специальный ресурс, обозначающий число джобов, доступных пользователю на кластере. На каждый джоб требуется один user slot. В частности, пользователь может ограничить количество одновременно запущенных джобов в операции, указав лимит на user slots.

Network — это устаревший тип ресурса, и он не имеет отношения к использованию сети в кластере. Точную его семантику мы не будем описывать, так как в будущих версиях YTsaurus данный ресурс будет удалён.

Значения, которыми оперирует планировщик — будь то гарантированные ресурсы или Fair Share, — это набор из пяти чисел, соответствующих поддерживаемым ресурсам.

Пример
Параметры показаны не в абсолютных значениях, а в долях кластера, что позволяет планировщику сравнивать их между собой
Параметры показаны не в абсолютных значениях, а в долях кластера, что позволяет планировщику сравнивать их между собой

Тип ресурса, которого пул или операция требуют больше всего, называется доминантным. В интерфейсе название этого ресурса отображается в столбце Dom. Res., а в качестве значения Fair Share отображается значение для доминантного ресурса. В примере № 9 запускаются две операции, у одной из которых доминантный ресурс — CPU, а у второй — RAM.

# 9. Running operations with different resource demand profiles.

# (cpu_limit, memory_limit)
operation_params = [
    (2.0, 1024**3),
    (1.0, 10 * 1024**3),
]

tracker = yt.OperationsTracker(print_progress=False)
for cpu_limit, memory_limit in operation_params:
    op = run_sleeping_operation(cpu_limit=cpu_limit, memory_limit=memory_limit, pool="testing")
    tracker.add(op)

В ранних версиях YTsaurus использовался другой, более простой подход, в котором FS представлялся одним числом — долей кластера по каждому ресурсу. Иными словами, на один CPU каждой операции полагалось фиксированное количество RAM, равное отношению общего количества RAM к общему количеству CPU в кластере. В частности, пользователь задавал для пула желаемую гарантию на CPU, а гарантия на остальные ресурсы вычислялась автоматически.

Такой подход плохо работал из‑за наличия в кластере большого разнообразия характеров нагрузки в операциях. Одной из больших проблем была невозможность точно настроить гарантию для пулов с memory‑intensive‑операциями. Пользователям приходилось искусственно завышать гарантию на CPU, чтобы их операциям хватало автоматически вычисленной гарантии на RAM. Поскольку подразделения платили за свои гарантии, это приводило к лишним тратам.

По этой причине несколько лет назад мы решили существенно переработать и усложнить наш алгоритм вычисления Fair Share, чтобы он сумел корректно определять FS по каждому ресурсу в отдельности. Как следствие, сейчас гарантии в YTsaurus можно задавать на разные типы ресурсов в произвольном соотношении (пример № 10).

# 10. Vector guarantees.

# (pool_path, guarantees)
pool_params = [
    ("company", {"cpu": 24.0, "memory": 54 * 1024**3}),
    ("company/analytics", {"cpu": 4.0, "memory": 16 * 1024**3}),
    ("company/analytics/chyt", {"cpu": 4.0, "memory": 16 * 1024**3}),
]

for pool_path, guarantees in pool_params:
    yt.set("//sys/pool_trees/default/" + pool_path + "/@strong_guarantee_resources", guarantees)

Вытеснение 

Представим следующую ситуацию. Запустилась операция, которая хочет 20 джобов. Других операций на кластере нет, поэтому она получает требуемое в полном объёме. Однако позднее запускается ещё одна операция в том же пуле — уже на 13 джобов. Веса у операций одинаковые, поэтому FS каждой из операций становится равным 0,5.

Однако у новой операции запущены всего четыре джоба — это всё, что было свободно на момент её старта. Как же планировщик справляется с этой ситуацией и приводит usage к целевому FS? Здесь приходит на помощь механизм вытеснения.

Планировщик определяет операции, которые недополучают ресурсы на протяжении небольшого промежутка времени (этот параметр настраивается, и обычно равен десяткам секунд), и помечает их как «страдающие». Также планировщик для каждой из операций делит джобы на вытесняемые и невытесняемые. Вытесняемыми считаются джобы, которые выходят за границу FS. При запуске нового джоба «страдающей» операции планировщик может прервать работу вытесняемых джобов других операций, чтобы освободить необходимое количество ресурсов.

# 11. Job preemption.

yt.set("//sys/pool_trees/default/company/development/testing/@allow_regular_preemption", False)

tracker = yt.OperationsTracker(print_progress=False)
tracker.add(run_sleeping_operation(job_count=20, pool="testing"))

# Only 4 jobs will run, because preemption is forbidden.
tracker.add(run_sleeping_operation(job_count=12, pool="testing"))

# Enable preemption.
yt.set("//sys/pool_trees/default/company/development/testing/@allow_regular_preemption", True)
Вторая операция запустилась позже и «страдает», так как не получила нужное количество ресурсов
Вторая операция запустилась позже и «страдает», так как не получила нужное количество ресурсов
Через некоторое время планировщик вытеснил джобы первой операции, чтобы освободить ресурсы для запуска недостающих джобов второй операции (пример № 11)
Через некоторое время планировщик вытеснил джобы первой операции, чтобы освободить ресурсы для запуска недостающих джобов второй операции (пример № 11)

Продвинутые фичи планировщика

Планировщик обладает богатой функциональностью. В этом разделе я расскажу про небольшую, но важную часть фич, которые могут пригодиться на практике.

FIFO-пулы

Начнем с FIFO‑пулов. Это альтернатива дефолтному для пулов режиму Fair Share.

В режиме FIFO ресурсы распределяются в порядке очереди, а не пропорционально весу. Система сортирует все запущенные операции и распределяет FS между ними в таком порядке. В итоге для какой‑то части операций FS окажется равным требованиям, у одной операции он будет ниже требований, а остальные операции не получат ничего, то есть FS для них будет равен нулю.

Порядок сортировки операций настраивается. Есть набор параметров, которые могут использоваться для данной сортировки, в их число входят вес и время старта операции. По умолчанию сортировка идёт по весу, а при равных весах — по времени старта.

Отметим, что в FIFO‑пуле не может быть вложенных пулов, в нём могут находиться только запущенные операции.

# 12. FIFO pools.

yt.set("//sys/pool_trees/default/company/development/testing/@mode", "fifo")

tracker = yt.OperationsTracker(print_progress=False)
for _ in range(6):
    op = run_sleeping_operation(job_count=5, pool="testing")
    tracker.add(op)
    time.sleep(0.1)
В первом столбце после названия операции можно увидеть порядок, который определил алгоритм в режиме FIFO (пример № 12)
В первом столбце после названия операции можно увидеть порядок, который определил алгоритм в режиме FIFO (пример № 12)

Поддержка нескольких деревьев пулов

Следующая фича — поддержка нескольких деревьев пулов. Вычислительные ноды кластера можно разбить на непересекающиеся группы, в каждой из которых будет своя иерархия пулов, и планирование ресурсов будет выполняться независимо. Как правило, такая необходимость возникает из‑за гетерогенности: например, можно выделить в отдельное дерево часть вычислительных нод кластера, на которых доступны GPU.

Разбиение вычислительных нод на деревья пулов происходит следующим образом. У каждой ноды есть набор тегов — системных или пользовательских. Дерево задаётся фильтром на основе данных тегов. Фильтр — это булева формула, в которой литералами выступают теги, и если нода удовлетворяет данной формуле, она принадлежит данному дереву. Задача администратора кластера — сделать так, чтобы каждая нода принадлежала единственному дереву.

# 13. Several pool trees.

# Get exec nodes.
all_nodes = yt.list("//sys/cluster_nodes", attributes=["tags", "flavors"])
exec_nodes = [node for node in all_nodes if "exec" in node.attributes["flavors"]]

# Set custom tag for one node.
other_tree_node = exec_nodes[0]
yt.set("//sys/cluster_nodes/" + other_tree_node + "/@user_tags", ["custom_tag"])

for node in exec_nodes:
    print(yt.get("//sys/cluster_nodes/" + node + "/@tags"), file=sys.stderr)

# Configure another pool tree.
yt.set("//sys/pool_trees/default/@config/nodes_filter", "!custom_tag")
yt.create("scheduler_pool_tree", attributes={"name": "other", "config": {"nodes_filter": "custom_tag"}})
Дерево пулов можно выбрать на странице Scheduling (пример № 13)
Дерево пулов можно выбрать на странице Scheduling (пример № 13)

Спекулятивное исполнение

Это фича, которая повышает предсказуемость исполнения вычислений на кластере. Рассмотрим следующую ситуацию: вы запустили операцию с несколькими джобами, и один из них попал на ноду с тормозящим диском. Планировщик может заметить, что данный джоб исполняется дольше обычного, и запустить спекулятивную копию этого джоба на другой ноде. Теперь эти джобы будут исполняться одновременно, выполняя одну и ту же работу, но на разных нодах. Данные того джоба, который завершится первым, попадут на выход, а другой джоб будет тут же прерван.

# 14. Speculative jobs.

op = yt.run_operation(
    yt.VanillaSpecBuilder()
        .begin_task("task")
            .command("sleep 1000")
            .job_count(1)
            .spec({"job_speculation_timeout": 5000})
        .end_task(),
    sync=False,
)
У спекулятивного джоба есть особая пометка (пример № 14)
У спекулятивного джоба есть особая пометка (пример № 14)

Интегральные гарантии

Наконец, есть интегральные гарантии — альтернатива обычным гарантиям, описанным выше.

Для понимания устройства данных гарантий представим, что вычислительный пул — это сосуд, который может накапливать ресурсы. При запуске операций в пуле накопленные ресурсы расходуются для обеспечения FS данного пула. Максимальный объём накопленных ресурсов, скорость накопления и скорость расхода ресурсов можно настраивать (пример № 15).

# 15. Integral guarantees.

# (pool_path, cpu_guarantee)
pool_params = [
    ("company/analytics/chyt", 0.0),
    ("company/analytics", 0.0),
    ("company", 22.0),
    ("company/development", 22.0),
]

# Reconfigure strong guarantees.
for pool_path, cpu_guarantee in pool_params:
    yt.set("//sys/pool_trees/default/" + pool_path + "/@strong_guarantee_resources", {"cpu": cpu_guarantee})

# (pool_path, integral_guarantees)
pool_params = [
    ("example", {"resource_flow": {"cpu": 1.0}}),
    ("example/integral", {
        "guarantee_type": "relaxed",
        "resource_flow": {"cpu": 1.0},
    }),
]

# Configure integral guarantees.
yt.create("scheduler_pool", attributes={"name": "integral", "parent_name": "example", "pool_tree": "default"})
for pool_path, integral_guarantees in pool_params:
    yt.set("//sys/pool_trees/default/" + pool_path + "/@integral_guarantees", integral_guarantees)
    
# Track accumulated resource volume.
pools_orchid_path = "//sys/scheduler/orchid/scheduler/pool_trees/default/pools"
while not ("integral" in yt.list(pools_orchid_path) and yt.exists(pools_orchid_path + "/integral/accumulated_resource_volume/cpu")):
    time.sleep(0.1)
    
def get_accumulated_cpu_volume():
    return yt.get(pools_orchid_path + "/integral/accumulated_resource_volume/cpu")

start_time = time.time()
for i in range(10):
    time.sleep(1.0)
    elapsed = time.time() - start_time
    print("Seconds elapsed: {}, accumulated CPU volume: {}".format(elapsed, get_accumulated_cpu_volume()))

Это полезно, например, для регулярных процессов, которые запускаются раз в день и завершаются достаточно быстро. Рассмотрим пул, в котором исполняются подобные расчёты. Интегральное потребление ресурсов в данном пуле может быть достаточно низким, но в момент запуска расчётов в пике может требоваться много ресурсов. Используя обычные гарантии, такому пулу пришлось бы выделить ресурсы согласно пиковой нагрузке, а интегральные гарантии позволяют действовать более гибко и выделять ровно тот объём ресурсов в виде CPU‑часов, который необходим.

Заключение

В этой статье мы разобрались, как устроена compute‑подсистема YTsaurus, поговорили о планировщике, распределении ресурсов между операциями и как на него можно повлиять.

  • Fair Share — это ключевая величина, которой оперирует планировщик. Она означает целевое количество ресурсов, полагающееся пулу или операции. После вычисления FS планировщик старается запускать джобы таким образом, чтобы реальное потребление операций совпадало с целевым.

  • Управлять распределением FS можно с помощью редактирования структуры дерева пулов, регулирования весов, гарантий и лимитов.

  • Планировщик поддерживает несколько типов ресурсов, включая CPU, RAM и GPU, причём гарантии на эти ресурсы можно настраивать в разных пропорциях для разных пулов.

  • Помимо базовой функциональности, планировщик предоставляет разные полезные фичи, такие как FIFO‑пулы и интегральные гарантии.

На этом всё. Все возможности compute‑подсистемы доступны в опенсорс‑версии YTsaurus. А если у вас возникнут вопросы, то смело задавайте их в нашем community‑чате, а также делитесь своим опытом и предложениями в комментариях!

Комментарии (0)