Недавно передо мной встала задача — есть 250 тестов, каждый из которых занимает от 5 до 30 минут, а иногда и часы. Надо запустить их в параллель на одной машине, но не больше 16 одновременно. Ограничение связано с некоторым лимитированным ресурсом, а также количеством ядер CPU. Реальное число одновременно запущенных задач нужно вычислять динамически, то есть пойти простым путем и зарегистрировать имя или лэйбл ресурса заранее с помощью плагина Lockable Resource не получится.
В поисках решения я прошерстил интернет и нашел несколько вариантов, но… Они не понравились мне плохой утилизацией ресурсов или невыразительной визуализацией пайплайна. И я написал свой вариант, точнее, несколько. Ниже я расскажу об известных мне подходах, кратко опишу минусы и плюсы каждого, поделюсь своим решением и тем, как пришел к нему. Возможно, вы наткнетесь на этот текст, решая похожую задачу. Мне бы он помог.
Выполнение задач «группами» (и почему его можно отбросить сразу)
Первое, что приходит на ум и легко гуглится — разделить тесты на группы по N штук в каждой и запускать эти группы в цикле через parallel. Однако этот подход уместен только когда задачи выполняются за примерно одинаковое количество времени, чтобы ресурсы были постоянно загружены.
Возможно, код в статьи покажется кому-то не совсем Groovysh.
Я больше Python-программист и в основном использую циклы, а функции наподобие each или collect применяю только там, где тело простое и помещается в 1 строчку.
Код решения:
node {
List<Closure> groups = []
Integer groupsCount = 3
Integer taskCount = 12
(1..groupsCount).each { groups.add([:]) }
for (int i in 1..taskCount) {
groups[i % groupsCount][i] = runTest("${i}")
}
for (group in groups) {
parallel group
}
}
def runTest(name) {
return {
stage(name) {
println(name)
}
}
}
В нашем же случае, напомню, тест может длиться от нескольких минут до нескольких часов. Решение не подходит.
Плюсы:
Легкая реализация
Не требует плагинов
Минусы:
Плохая утилизация ресурсов. Следующая группа тестов ждет, когда завершится последний тест из предыдущей группы. При этом чаще всего тесты выполняются за разное количество времени.
Ищем дальше!
Мой первый подход: скрипт на Python с использованием Queue и threading
Следующей в голову пришла мысль использовать Python-скрипт: создать с помощью threading нужное количество воркеров, а они будут брать задачи из очереди Queue. Помимо Python, подход потребует дополнительного репозитория со скриптом или дополнительной функции обёртки в Jenkins shared library, где можно разместить и сам скрипт.
Вот как выглядит код подобного решения через функцию в Jenkins shared library, которая выкачивает скрипт и запускает его с необходимыми параметрами:
runTasksInParallel.groovy:
def call(String testSuitePath, Integer threads) {
String scripDir = "${env.WORKSPACE}/runTestSuite"
String scriptName = "run_test_suite_in_parallel.py"
if(! fileExists("${scripDir}/${scriptName}")) {
sh(label: "create script directory", script: "mkdir -p ${scripDir}")
writeFile(file: "${scripDir}/${scriptName}", text: libraryResource(scriptName))
}
dir(scripDir) {
try {
String output = sh(
label: "running script via python",
script: """
python3 ${scriptName} \
--test-suite "${env.WORKSPACE}/${testSuitePath}" \
--threads ${threads} \
2>&1
""",
returnStdout: true
)
return output
} finally {
deleteDir()
}
}
}
run_test_suite_in_parallel.py:
import logging
import argparse
import queue
import threading
from pathlib import Path
def run_test(q: object):
while True:
test_path = q.get()
try:
print(f"Running test from {test_path}")
except Exception as err:
logging.error(err, stack_info=True)
finally:
q.task_done()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Script for running tests in parallel")
parser.add_argument("--test-suite", dest="test_suite", action="store", required=True,
help="path to test suite directory")
parser.add_argument("--threads", dest="threads", action="store", required=True, type=int,
help="limit of concurrent threads")
args = parser.parse_args()
q = queue.Queue()
for _ in range(args.threads):
worker = threading.Thread(target=run_test,
args=(q),
daemon=True)
worker.start()
test_suite = list(Path(args.test_suite).glob("**/*.test"))
for test in test_suite:
q.put(test)
q.join()
Плюсы:
Легкая реализация, не требующая какой-то определённой версии Python или нестандартных модулей.
Хорошая утилизация ресурсов. Как только воркер заканчивает выполнять одну задачу, он сразу получает следующую.
Минусы:
Плохая визуализация. Всё скрыто внутри одного стэйджа. Прогресс выполнения задач никак не визуализируется, остается разве что читать вывод питоновского скрипта сплошным текстом.
Часть кода отделена от пайплайна и даже может находиться в других репозиториях.
Основное, что не устроило меня в этом подходе — визуализация. Хотелось видеть, какие задачи выполняются в текущий момент времени, какие прошли, а какие упали. Понимать, во сколько потоков выполняется пайплайн. И чтобы логи находились в разных стэйджах.
Я попробовал улучшить визуализацию, генерируя фэйковые стэйджи после выполнения скрипта. И это привело меня к решению без Python-скрипта, о котором я расскажу ниже. Но параллельно…
Использование семафора из плагина Concurrent Step (но оно сомнительно для прода)
Параллельно с этим я нашел плагин, который позволяет использовать семафоры в пайплайне. Решение заключается в следующем. Вначале мы создаем семафор с ограниченным количеством допусков. И запускаем через parallel наш список задач. А в каждой задаче проверяем семафор.
@groovy.transform.Field semaphore = null
node {
semaphore = createSemaphore(permit: 3)
Map tasks = [:]
Integer taskCount = 12
for (int i in 1..taskCount) {
tasks[i] = runTest("${i}")
}
parallel tasks
}
def runTest(name) {
return {
stage(name) {
acquireSemaphore (semaphore) {
println(name)
}
}
}
}
Плюсы:
Легкая и понятная реализация. Семафоры присутствуют во многих языках и многим известны.
Хорошая утилизация ресурсов. Как только семафор разблокируется, сразу стартует следующая задача.
Минусы:
Сам плагин давно не обновлялся и есть вероятность, что его перестали поддерживать: в случае конфликта с другими плагинами в будущем он может не иметь обновления.
Также коллеги скинули комментарий от одного из мейнтейнеров Jenkins
Jesse Glick: From a brief glance at https://github.com/jenkinsci/concurrent-step-plugin I would say that it is designed incorrectly (confuses “native” Java threads with “virtual” CPS VM threads) and should not be used. Most or all of its steps probably could be reimplemented correctly while using the same Pipeline script interface.
Плохая визуализация. Все задачи как будто выполняются одновременно. Мы получаем длинный вертикальный столбик стэйджей в графе и не понимаем, во сколько потоков всё это выполняется и сколько ресурсов заняло.
Мой второй подход на основе опыта с Python-скриптом (и с замыканиями)
Генерируя фэйковые стэйджи для улучшения визуализации первого подхода, я понял, что с parallel можно попробовать тот же подход с очередью и воркерами, что я применял в Python-скрипте. Для этого помимо списка задач выполнения тестов, создадим ещё список воркеров, которые представляют из себя тоже задачи. Но их задачей будет не выполнение тестов, а вытаскивание из очереди задач с тестами и их запуск. Список воркеров как раз и будем передавать parallel. Для потокобезопасной работы со списком задач я решил использовать плагин Lockable Resource (он уже был у нас установлен), при этом никаких ресурсов заранее создавать не нужно.
Пример решения в виде функции в Jenkins shared library:
limitedParallel.groovy:
def call(Map params) {
Integer workerCount = params.workerCount ?: 1
String resourceName = params.resourceName ?: "cores"
List<Closure> queue = params.tasks ? params.tasks.clone() : []
Boolean failFast = params.failFast ?: false
Map workers = ["failFast": failFast]
for(int i in (0..workersCount-1).toList()) { // toList to make function serializable
workers["${resourceName}_${i}"] = getTaskFromQueue(queue)
}
parallel workers
}
def getTaskFromQueue(List<Closure> queue) {
return {
Closure task = null
while(true) {
lock("Thread safe ${env.BUILD_TAG}") { // Resource name should be unique among multiple concurrent job runs
task = queue ? queue.pop() : null
}
if (task) {
task()
} else {
break // queue is empty
}
}
}
}
Плюсы:
Хорошая визуализация. Каждый стэйдж соответствует отдельной задаче. Наглядно видно, во сколько потоков выполняется пайплайн, какие задачи упали, какие прошли, и какие задачи идут в данный момент.
Хорошая утилизация ресурсов. Как только заканчивается одна задача, сразу же начинается следующая, и так пока очередь не иссякнет.
Минусы:
Если количество задач равно количеству потоков или меньше него, граф в плагине BlueOcean не отображает имена задачек, их место занимают имена потоков.
На этом подходе я и остановился. И хочу поделиться примером его использования.
Чтобы было видно, как тесты выполняются за разное количество времени, а разные потоки успевают вытащить из очереди разное количество задачек, я добавил sleep на время от 1 до 10 секунд.
node {
List<Closure> tasks = []
Integer taskCount = 12
for (int i in 1..taskCount) {
tasks.add(runTest("${i}"))
}
limitedParallel(tasks: tasks, workersCount: 4, resourceName: "core", failFast: true)
}
def runTest(String name) {
return {
stage(name) {
Integer t = Math.abs( new Random().nextInt() % (10 - 1) ) + 1
sh("sleep ${t}")
}
}
}
Также эту идею можно реализовать через колбэк-функцию вместо заранее определенной очереди. Но мне больше нравится данное решение через очередь. Также для наглядности можно прикрутить счётчик, который будет показывать в поле "Описание" сколько задач прошло и сколько осталось.
p.s. Если вы видели или знаете другие варианты решения задачи при неизвестном заранее количестве ресурсов, делитесь в комментариях, это сделает материал полезнее!
mgromw
Допустим, задачи выполняются за сильно разное количество времени, тогда, если мы вдруг имеем решение с параллельностью и ограничением количества тредов, где задачи просто берутся из кучи в неотрегулированной последовательности, то всё равно, велика вероятность, что n-1 задач, будучи раскиданы по тредам, выполнятся примерно за одинаковое количество времени, а последняя окажется очень долгая и её надо будет ждать одну.
Мне кажется, если уж говорить о полном решении с параллельностью, то нужно заранее знать, за сколько единиц времени выполняется каждая задача из списка, и уже на основе этой информации раскидывать по тредам. Или, если пытаться в какую-то автоматизацию, то новый плагин (или доработка старого), который будет сам смотреть время выполнения стейджей в предыдущих запусках конкретной джобы и уже на этом строить наполнение потоков.
K_Leshik Автор
Согласен. Балансировка никакой здесь нет. Задачи берутся просто по очереди. На практике иногда так и получается, что в конце остаются 4-5 долгих задач, которые неплохо было бы запустить пораньше. Задача по балансировка есть, и скорее всего алгоритм будет улучшен в будущем. Но даже такое решение отлично подходит для решения нашей задачи.
mgromw
Достаточно простым расширением функции, мне кажется, выглядит - взять продолжительность (среднее или что-то другое) каждой из стейджей на основе N предыдущих билдов (вот тут пример, как можно сделать https://stackoverflow.com/questions/52322290/capture-time-taken-by-each-pipeline-stage-in-jenkins/64215781#64215781) и, уже на основе этих данных, раскидать по потокам.
Будут моменты, которые надо будет решить, как что делать с новыми стейджами или если какой-то стейдж только падал... Но тут достаточно, мне кажется, просто "придумать" правило, какое время ему давать для передачи в функцию распределения по тредам.