Вводная часть


Часто возникает необходимость управления задачами на множестве вычислительных узлов. Если выполнение задачи можно автоматизировать, написав скрипт, тогда все еще остается необходимость запуска, контроля за исполнением, остановкой этого скрипта в кластере. Задача может быть какой угодно: получить файл через wget, создать дамп локальной БД, запустить нагрузочный тест, выполнить архивацию старых файлов и т.д.

Существует множество автоматизированных систем управления исполнением задач в кластере. К таким системам можно отнести централизованные планировщики пакетных задач в распределенных вычислительных системах, такие как: htcondor, torque, slurm и другие. Централизованные планировщики обычно состоят из одного или нескольких процессов мастера, который управляет множеством процессов воркеров (worker), запущенных на каждом из хостов кластера, на которых и происходит запуск задач. Такие системы прижились в области high-throughput computing (HTC), high-performance computing (HPC), но редко используются в ежедневных задачах серверного администрирования и в относительно небольших кластерах во многом из-за возникающей сложности в установке, использовании и поддержке таких систем.

Общее описание планировщика Prun


Prun предоставляет контроль за распределенным исполнением пакетных задач в UNIX-подобных ОС. Prun можно отнести к централизованным планировщикам, однако он имеет упрощенный интерфейс для описания и управления исполнением задач. Мастер prun предоставляет механизм очередей задач, планирования исполнения с учетом приоритетов задач и вычислительных ресурсов (CPU, RAM), механизм обеспечения отказоустойчивости исполняемых задач, а также планирование задач по расписанию (аналог cron). Процесс мастера (pmaster) запускается на одной из машин в кластере.

Весь процесс управления мастером происходит с использованием утилиты для администрирования (prun). На каждом из хостов должен быть запущен экземпляр воркера. Воркер состоит из двух процессов: собственно процесс воркера (pworker), который принимает команды мастера, отвечает на периодические heartbeat сигналы, сохраняет статус выполненной задачи; процесс prexec, который всегда исполняется под непривилегированным пользователем и запускает задачи (используя fork+exec для порождения процесса), одновременно отслеживая статус исполнения процесса.

Создание и запуск «Hello, world!» задачи


Для запуска задачи с помощью prun нужно создать файл скрипта на одном из поддерживаемых ЯП (shell, python, ruby, js, java) и .job файл с описанием этой задачи. Вот примеры простейшего shell-скрипта hello_world.sh и файла с описанием задачи hello_world.job:

#!/bin/sh
echo "Hello, Shell!"
echo "taskId="$taskId", numTasks="$numTasks, "jobId="$jobId

{
    "script" : "/home/nobody/jobs/hello_world.sh",
    "language" : "shell",
    "send_script" : true,
    "priority" : 4,
    "job_timeout" : 120,
    "queue_timeout" : 60,
    "task_timeout" : 15,
    "max_failed_nodes" : 10,
    "num_execution" : -1,
    "exec_unit_type" : "host"
    "max_cluster_instances" : -1,
    "max_worker_instances" : 1,
    "exclusive" : false,
    "no_reschedule" : false
}

Job файл представляет собой множество пар ключ-значение в формате JSON. Описание некоторых из свойств задачи:
  • script — путь к файлу скрипта. Если файл находится в файловой системе мастера, тогда при установленном значении «send_script» файл скрипта будет отправляться каждому из воркеров на исполнение.
  • language — язык скрипта. Определяет, какой интерпретатор (python, ruby, js, java, shell) использовать для выполнения скрипта.
  • priority — целочисленное значение приоритета задачи. Чем меньше значение, тем выше приоритет задачи.
  • job_timeout — значение таймаута для выполнения задачи на всем кластере от момента первого исполнения на одном из воркеров до последнего запланированного исполнения.
  • queue_timeout — максимально допустимое время ожидания в очереди задач.
  • task_timeout — значение таймаута для одной запущенной задачи на воркере.
  • max_failed_nodes — максимальное количество задач, исполнение которых может завершиться с ошибкой и/или машин воркеров, ставших недоступными во время исполнения этой задачи. При достижении этого лимита мастер останавливает исполнение всех экземпляров этой задачи на кластере.
  • num_execution — число запланированных запусков экземпляров задачи (тасков). Если значение отрицательное и значение exec_unit_type==host, тогда число тасков будет равняться числу всех доступных воркеров в кластере.
  • max_cluster_instances — лимит одновременно запущенного числа тасков на всем кластере.
  • max_worker_instances — лимит одновременно запущенного числа тасков на одной машине воркера.
  • exclusive — если установлено в true, то на время исполнения таска на воркере другие задачи не будут планироваться к исполнению на том же воркере одновременно с этой задачей.

Есть и другие свойства в описании задачи: расписание как в crontab, уникальное имя задачи, черный и белый список воркеров и другие.
В терминологии prun задача (job) условно разбивается на множество тасков (task), где количество тасков определяет количество запусков скрипта этой задачи в кластере. Каждый таск однозначно определяется парой (job_id, task_id), при этом у каждой задачи свой уникальный job_id, а значение task_id лежит в полуинтервале [0, num_execution). Последнее свойство можно использовать, чтобы понять какую часть задачи нужно выполнять, что аналогично рангу процесса в терминах MPI.

Запуск задачи, а точнее постановка задачи в очередь, можно произвести командой:
./prun -c "run /home/nobody/jobs/hello_world.job"

Если воркер запущен в терминале не в режиме демона, тогда в результате запуска задачи в стандартный вывод напечатается:
Hello, shell!
taskId=0 numTasks=1 jobId=0

Конфигурация мастера


Каждый воркер относится к одной и только одной группе воркеров. Файл groups (/etc/pmaster/groups) содержит список групп воркеров, где название группы должно совпадать с именем файла, который содержит список адресов хостов, на которых запущены воркеры.

К примеру, если имеется 8 машин, которые нужно разместить в две различные группы, тогда следует создать 2 файла (my_group1 и my_group2) и записать в них названия хостов (или ip'шники). Затем записать в файл groups две строчки my_group1 и my_group2.

Также группы можно создавать, удалять, а также добавлять и удалять воркеры из группы в любой момент после запуска мастера.

Конфиги мастера и воркера находятся в файлах master.cfg (/etc/pmaster/master.cfg) и worker.cfg (/etc/pworker/worker.cfg) соответственно.

Пример создания cron задачи


Процесс создания задачи с исполнением по расписанию не отличается от описанного выше примера «Hello, world!». Для этого обязательно нужно добавить параметры name и cron. Имя задачи должно быть уникальным, поскольку служит идентификатором cron задачи. Формат описания расписания соответствует формату из crontab (man 5 crontab). К примеру, чтобы задача запускалась каждое воскресенье в 3 часа после полуночи, значением ключа cron должно быть "* 3 * * 0".

Пример задачи с расписанием можно посмотреть в jobs/cron.job в репозитории проекта. Из описания этой задачи следует, что она будет запускаться на каждом воркере кластера ровно один раз каждую минуту.

Пример создания метазадачи, представленной DAG-графом зависимостей между задачами


Иногда возникает требование выполнять некоторую задачу A только после выполнения одной или нескольких других задач (скажем, B и C). Такие зависимости образуют ориентированный граф без циклов. В терминологии prun такие задачи называются метазадачами или группой задач, а граф зависимостей между задачами описывается в .meta файле. Зависимости между задачами описываются в .meta файле списком смежности. Пример метазадачи, которая должна выполняться каждую минуту по расписанию, можно посмотреть в jobs/cron.meta в репозитории проекта.

Управление исполнением задач


Кроме добавления задачи с помощью команды run, есть следующий набор команд:
  • stop <job_id> — остановка задачи с уникальным идентификатором job_id.
  • stop <job_name> — остановка задачи по ее уникальному имени.
  • stopg <group_id> — остановка группы задач (метазадачи).
  • stopall — остановка всех задач в кластере и очистка очередей ожидающих задач.
  • add [<host_name> <group_name>]* — добавление воркера к группе воркеров.
  • delete <host_name> — удаление воркера с принудительным завершением всех запущенных на нем задач.
  • addg <path_to_group_file> — добавление группы воркеров.
  • deleteg <group_name> — удаление группы воркеров.
  • info <job_id> — отображает статистику исполнения по конкретной задаче.
  • stat — общая статистика по всему кластеру.
  • ls — краткая статистика по каждому из воркеров.
  • cron — показывает запланированные cron задачи.

Отказоустойчивость


Мастер периодически рассылает heartbeat сообщения всем воркерам. Если воркер не ответил на сообщение определенное количество раз, то он считается недоступным, и все выполняемые на нем задачи планируются на другие доступные воркеры. Аналогично задачи, которые вернули код ошибки или аварийно завершились, планируются на другие доступные воркеры.

Мастер может сохранять свое состояние по выполняемым и поставленным в очередь задачам во внешнюю БД. Сейчас существует возможность сохранять состояние в одну из следующих БД: LevelDB, Cassandra, Elliptics. В случае перезапуска мастера, мастер восстанавливает все поставленные в очередь задачи и задачи, которые не успели полностью завершиться на момент предыдущей остановки мастера, а также останавливает все ранее запущенные задачи на воркерах.

Заключение


Основными требованиями на этапе разработки prun были: высокая производительность, минимальное количество зависимостей от внешних библиотек, удобство для ежедневного использования и надежность всех компонент. Prun написан на C++ и единственной необходимой зависимостью является библиотека Boost.

Должен упомянуть об использовании prun в реальных задачах. Сейчас prun используется в организации нагрузочного тестирования на небольшом кластере из 20 хостов. Суть задачи нагрузочного тестирования заключается в развертывании приложения нагрузочного тестирования, его настройке и запуске тестов по расписанию одновременно на всем кластере.

Максимальный кластер, на котором проверялась работа планировщика, состоял из 140 хостов в AWS, причем хосты были различной мощности (несколько large инстансов, остальные — micro).

В дальнейшем планируется вести статистику загруженности воркеров по многим параметрам в реальном времени для оптимизации планирования задач мастером, поскольку на данный момент используется всего 2 статистики: отношение количества запущенных задач к числу ядер/процессоров и общий объем RAM. Есть множество направлений для дальнейшего развития планировщика.

Напоминаю, что предоставленный в этой статье проект открыт и расположен на GitHub: github.com/abudnik/prun

Комментарии (9)


  1. bebop
    24.08.2015 16:05

    А почему не подошли уже существующие планировщики: CONDOR, DIET итп?

    Вот, например, хорошая
    дискуссия на серверфолт на эту тему.

    UPD: извиняюсь — в статье есть объяснение почему не подошли.


  1. youROCK
    24.08.2015 18:18

    Мастер периодически рассылает heartbeat сообщения всем воркерам. Если воркер не ответил на сообщение определенное количество раз, то он считается недоступным, и все выполняемые на нем задачи планируются на другие доступные воркеры. Аналогично задачи, которые вернули код ошибки или аварийно завершились, планируются на другие доступные воркеры.


    Представьте себе ситуацию, когда вы на время (например, в течение нескольких минут) исчерпали все доступные ресурсы по CPU или по сети в кластере, и ваши воркеры начали таймаутиться. После этого, если я правильно понял алгоритм, все задания будут запускаться во втором, третьем и т.д. экземплярах на всё ещё живых нодах, пока их тоже не вырубит?

    Такой сценарий, насколько мне известно, называется каскадным отключением, и далеко не все системы проектируются с его учётом.


    1. abudnik
      24.08.2015 19:12
      +1

      Можно воспользоваться ручкой max_failed_nodes для ограничения длины такой эстафеты затаймаученых задач.

      Если пытаться решать эту проблему, тогда нужно динамически менять таймаут в сторону увеличения. Для этого нужна обратная связь от запущенного процесса. Как понять процесс завис в infinite loop или уперся в железо? А если увеличить таймаут, то возникает вопрос: нужно ли уменьшать таймаут после того, когда ресурсы освободятся? Не совсем ясно как решать проблему с каскадным отключением автоматически в общем случае.


      1. youROCK
        24.08.2015 19:44

        В нашей системе (которую мы, в отдаленной перспективе, планировали тоже заопенсорсить) мы не перезапускаем процесс, пока не вышло максимальное время его исполнения. Ну и также, если хост живой, мы периодически сверяем список запущенных процессов на сервере и список запущенных задач в планировщике. У нас есть отдельный механизм, который гарантирует, что задание будет убито спустя указанный промежуток времени, и основывается он на системном вызове alarm() и на запрете перехвата сигналов процессом. Наши приложения представляют из себя PHP, поэтому это работает. Для других языков, я думаю, можно придумать что-нибудь похожее.

        На самом деле, проблема очень сложная для решения, потому что фактически невозможно отличить недоступность хоста и недоступность, к примеру, сети до хоста. То есть, практически невозможно убедиться, что 2 экземпляра программы не запущено одновременно. Для нас это было критично, поэтому мы используем описанное выше решение.


        1. bebop
          24.08.2015 20:10

          У нас есть отдельный механизм, который гарантирует, что задание будет убито спустя указанный промежуток времени, и основывается он на системном вызове alarm() и на запрете перехвата сигналов процессом.

          А когда вы убиваете процесс, что вы делаете с дочерними процессами, которые мог породить убиваемый процесс? Или в unix'е они самоубьются? Боролся с этим на windows и никакой приличной стратегии не нашёл, кроме как не порождать дочерних процессов из процесса под шедулером :)


          1. youROCK
            24.08.2015 20:19

            Все верно — у нас запрещен fork(). То есть, если нужно запустить еще процессов, то разработчики должны использовать API по добавлению новых заданий, которые, при правильных настройках, могут попасть и на тот же хост.


  1. maxxcny
    26.08.2015 15:10

    А есть ли информация по производительности описанного планировщика?
    Сколько job'ов он может одновременно обрабатывать/отслеживать?


    1. abudnik
      26.08.2015 16:00

      На воркере каждый запуск задачи порождает процесс. Пара fork+exec занимает порядка 10 мс. Т.е. приблизительно 100 запусков hello-world задачи с одного ядра. Такие ограничения для воркера.

      Есть синтетический нагрузочный тест ptest_load, который проверяет отдельно алгоритм планирования мастера. Этот тест симулирует планирование 100k задач на 10k воркеров. Это однопоточный тест и на моем железе планировалось где-то 4.5k задач в секунду.


  1. abudnik
    26.08.2015 15:59

    del