Для кого статья?

Статья для тех, кто хочет разобраться в работе Thread Pool и написать наивную реализацию с использованием С++ 14 и С++ 17. Стоит упомянуть, что представленные реализации будут представлять решение учебной задачи и не подойдут для коммерческого использования.

Что нового я могу узнать из статьи?

  • Кто такой Thread Pool?

  • Зачем использовать Thread Pool?

  • Логика работы Thread Pool

  • Реализация (С++ 14)

  • Реализация (С++ 17)

  • Сравнение реализаций

Кто такой Thread Pool?

Это удобный в использовании паттерн, позволяющий выполнять множество задач используя ресурсы множества потоков. Thread Pool состоит обычно из очереди задач и нескольких потоков, которые достают задачи из очереди и выполняют их параллельно. Есть и реализации с отдельной очередью на каждый поток, но мы их рассматривать не будем.

Зачем использовать Thread Pool?

  • Помогает увеличить производительность программы благодаря созданию потоков один раз (создание потока считается достаточно тяжёлой операцией).

  • Предоставляет удобный интерфейс для работы с многопоточностью.

Немного цифр

Протестировав 3 случая: запуск без дополнительных потоков, с созданием потоков через std::thread и с использованием thread_pool. На программе:

void test_func(int& res, const std::vector<int>& arr) {
    res = 0;
    for (int i = arr.size() - 1; i >= 0; --i) {
        for (int j = 0; j < arr.size(); ++j) {
            res += arr[i] + arr[j];
        }
    }
}

Получились следующие результаты:

Способ запуска

Время (миллисекунды)

Кол-во потоков

Без дополнительных потоков

83954

1

std::thread

62386

6

thread_pool

52474

6

У меня на компьютере можно создать максимум 8 потоков и запускался тестовый пример в Visual Studio на платформе Windows. Это значит, что фоновая работа сторонних приложений может создавать флуктуации и при каждом запуске мы будем получать разные времена. Код примеров можно посмотреть ТУТ функция run_test .

Почему 6 потоков не ускорили код в 6 раз?

Дело в том, что Thread Pool в моей реализации сделан не оптимально т.к. использует condition_variable, что сильно замедляет работу, так же присутствуют средства синхронизации (нужно помнить про закон Амдала). По мимо вышеперечисленных фактов тестирование проводилось на Windows, что не исключает разделение общих ресурсов ПК с другими приложениями.

Хорошо, но тогда как Thread Pool работает?

Thread Pool имеет очередь задач, из которой каждый поток достаёт новую задачу при условии, что очередь не пуста и поток свободен. Для более детального описания давайте рассмотрим работу Thread Pool на примере:

Начальная стадия: все потоки свободны, а в очереди присутствует 5 задач.

Стадия 1: Каждый из потоков взял задачу на исполнение, при этом на практике первый поток не обязательно берёт первую задачу, это зависит от того, кто первый получит доступ к общему ресурсу - очереди. В очереди остались только 4 и 5 задача (чёрным цветом обозначены задачи, которые остались в очереди).

Стадия 2: На 3 секунде второй поток завершает выполнение 2 задачи и берёт первую свободную задачу из очереди (4 задачу). В очереди остаётся только 5 задача.

Стадия 3: Третий поток завершил задачу 3 и взял последнюю задачу из очереди (5 задачу). Очередь стала пустой, но наша программа не должна завершать работу, сначала следует дождаться выполнения всех задач.

Стадия 4: Первый поток завершил выполнение своей задачи и не берёт новых задач (т. к. очередь пуста). Может показаться, что если у нас нет задач в очереди, то следующие задачи поток уже не выполняет. На самом деле это не так и как только придёт новая задача, свободный поток (но мы не знаем какой именно) сразу начнёт её исполнение.

Стадия 5: Третий поток закончил выполнять задачу 5.

Стадия 6: Второй поток закончил исполнение 4 задачи.

Итог: Thread Pool выполнил 5 задач за 11 секунд.

Что будет уметь делать наш Thread Pool?

Мы уже разобрались с общим механизмом работы Thread Pool, теперь подумаем о его функциональности.

В нашем Thread Pool будет 2 типа задач: блокирующие (будут тормозить основной поток до тех пор, пока команда не будет исполнена) и не блокирующие (гарантированно исполняются очень быстро и не тормозят основной поток).

Thread Pool будет иметь следующий интерфейс:

  • init(num_threads) - метод, создающий массив из num_threads потоков. В нашей реализации в качестве данного метода будет выступать конструктор.

  • add_task(task_func, args) - не блокирующий метод добавления новой задачи. Принимает функцию task_func и аргументы данной функции args и возвращает task_id (уникальный номер задачи).

  • wait(task_id) - блокирующий метод, ожидающий выполнения задачи с указанным task_id. В данной реализации мы не будем сохранять результат работы функции (мы исправим данный недостаток чуть позже), при этом функция обязательно должна возвращать void.

  • wait_all() - блокирующий метод, дожидающийся завершения всех задач.

  • calculated(task_id) - не блокирующий метод, проверяющий была ли выполнена задача с номером task_id.

  • shutdown() - блокирующий метод, дожидающийся завершения всех задач и завершающий работу Thread Pool. Для корректного завершения работы программы будем использовать деструктор (хотя можно дополнительно добавить и метод).

Реализация базовой версии (C++ 14)

Рассмотрим переменные, которые будут использоваться в нашем классе:

// очередь задач - хранит функцию(задачу), которую нужно исполнить и номер задачи
std::queue<std::pair<std::future<void>, int64_t>> q; 

std::mutex q_mtx;
std::condition_variable q_cv;

// помещаем в данный контейнер исполненные задачи
std::unordered_set<int64_t> completed_task_ids; 

std::condition_variable completed_task_ids_cv;
std::mutex completed_task_ids_mtx;

std::vector<std::thread> threads;

// флаг завершения работы thread_pool
std::atomic<bool> quite{ false };

// переменная хранящая id который будет выдан следующей задаче
std::atomic<int64_t> last_idx = 0;

Очередь хранит std::future<void> - объект, который в будущем вернёт тип void, использование std::future позволяет не сразу вычислять функцию, а отложить вызов до нужного нам момента, также можно использовать и std::function<void()> (такой способ тоже допустим).

thread_pool(uint32_t num_threads) {
    threads.reserve(num_threads);
    for (uint32_t i = 0; i < num_threads; ++i) {
        threads.emplace_back(&thread_pool::run, this);
    }
}

В конструкторе мы создаём указанное число потоков и каждый из потоков запускает единственный приватный метод run.

void run() {
    while (!quite) {
        std::unique_lock<std::mutex> lock(q_mtx);
        
        // если есть задачи, то берём задачу, иначе - засыпаем
        // если мы зашли в деструктор, то quite будет true и мы не будем 
        // ждать завершения всех задач и выйдем из цикла
        q_cv.wait(lock, [this]()->bool { return !q.empty() || quite; });

        if (!q.empty()) {
            auto elem = std::move(q.front());
            q.pop();
            lock.unlock();

			// вычисляем объект типа std::future (вычисляем функцию) 
            elem.first.get();

            std::lock_guard<std::mutex> lock(completed_task_ids_mtx);
            
            // добавляем номер выполненой задачи в список завершённых
            completed_task_ids.insert(elem.second);

            // делаем notify, чтобы разбудить потоки
            completed_task_ids_cv.notify_all();
        }
    }
}

condition_variable на методе wait (q_cv) захватывает мьютекс, проверяет условие, если условие верно, то мы идём дальше по коду, иначе - засыпаем, отпускаем мьютекс и ждём вызов notify из метода добавления задач (когда приходит notify процедура повторяется - захватываем мьютекс и проверяем условие). Таким образом мы берём задачи до тех пор, пока они не кончатся, а когда кончатся и придёт новая задача мы разбудим поток.

template <typename Func, typename ...Args>
int64_t add_task(const Func& task_func, Args&&... args) {
    // получаем значение индекса для новой задачи
    int64_t task_idx = last_idx++;

    std::lock_guard<std::mutex> q_lock(q_mtx);
    q.emplace(std::async(std::launch::deferred, task_func, args...), task_idx);
    
    // делаем notify_one, чтобы проснулся один спящий поток (если такой есть)
    // в методе run
    q_cv.notify_one();
    return task_idx;
}

std::async(std::launch::deferred, task_func, args...) данная функция не смотря на название async ничего не делает асинхронно благодаря параметру  std::launch::deferred. Мы просто запоминаем аргументы функции, как в случае с std::bind . Отличаем является лишь то, bind не требует заполнять все аргументы функции, в отличает от std::async.

void wait(int64_t task_id) {
    std::unique_lock<std::mutex> lock(completed_task_ids_mtx);
    
    // ожидаем вызова notify в функции run (сработает после завершения задачи)
    completed_task_ids_cv.wait(lock, [this, task_id]()->bool {
        return completed_task_ids.find(task_id) != completed_task_ids.end(); 
    });
}

void wait_all() {
    std::unique_lock<std::mutex> lock(q_mtx);
    
    // ожидаем вызова notify в функции run (сработает после завершения задачи)
    completed_task_ids_cv.wait(lock, [this]()->bool {
        std::lock_guard<std::mutex> task_lock(completed_task_ids_mtx);
        return q.empty() && last_idx == completed_task_ids.size();
    });
}

Обратите внимание, что wait_all внутри wait использует ещё одну блокировку для очереди для проверки на пустоту (мы должны блокировать каждый разделяемый ресурс, чтобы избежать data race).

Так же обратите внимание, что std::lock_guard стоит там, где нет wait для мьютекса и не нужно делать unlock (std::unique_lock в остальных случаях). Если вы будите придерживаться данного правила, то программисты, смотрящие ваш код скажут вам спасибо.

bool calculated(int64_t task_id) {
    std::lock_guard<std::mutex> lock(completed_task_ids_mtx);
    if (completed_task_ids.find(task_id) != completed_task_ids.end()) {
        return true;
    }
    return false;
}

Неблокирующий метод проверки задачи на завершённость возвращает true если задача с данным task_id уже посчитана, иначе - false.

~thread_pool() {
    // можно добавить wait_all() если нужно дождаться всех задачь перед удалением
    quite = true;
    for (uint32_t i = 0; i < threads.size(); ++i) {
        q_cv.notify_all();
        threads[i].join();
    }
}

Если экземпляр класса thread_pool удаляется, то мы дожидаемся завершения всех потоков в деструкторе. При этом, если в очереди есть ещё задачи, то каждый поток выполнит ещё одну задачу и завершит работу (это поведение можно поменять и, например, дожидаться выполнения всех задач перед завершением).

Полный код данной реализации можно посмотреть ТУТ.

Пример работы с Thread Pool

void sum(int& ans, std::vector<int>& arr) {
    for (int i = 0; i < arr.size(); ++i) {
        ans += arr[i];
    }
}

int main() {
    thread_pool tp(3);
    std::vector<int> s1 = { 1, 2, 3 };
    int ans1 = 0;
    
    std::vector<int> s2 = { 4, 5 };
    int ans2 = 0;
    
    std::vector<int> s3 = { 8, 9, 10 };
    int ans3 = 0;
		
    // добавляем в thread_pool выполняться 3 задачи
    auto id1 = tp.add_task(sum, std::ref(ans1), std::ref(s1));
    auto id2 = tp.add_task(sum, std::ref(ans2), std::ref(s2));
    auto id3 = tp.add_task(sum, std::ref(ans3), std::ref(s3));

    if (tp.calculated(id1)) {
        // если результат уже посчитан, то просто выводим ответ
        std::cout << ans1 << std::endl;
    }
    else {
        // если результат ещё не готов, то ждём его
        tp.wait(id1);
        std::cout << ans1 << std::endl;
    }
    tp.wait_all();

    std::cout << ans2 << std::endl;
    std::cout << ans3 << std::endl;
    return 0;
}

Стоит обратить внимание на std::ref благодаря ему будет передана ссылка, а не копия объекта (это особенность передачи аргумента в std::future).

Тут приведён достаточно простой пример работы с Thread Pool. Давайте посмотрим на этот небольшой фрагмент кода и подумаем что можно улучшить.

Недостаток

Последствия

1

Функция обязательно должна быть void

Придётся менять сигнатуру функции, если она возвращала какое-то значение

2

Приходится хранить дополнительно переменную для ответа

Если нам понадобиться несколько значений из thread_pool, то придётся с собой таскать все эти переменные. А если нам нужно 100 значений и больше ... ?

К сожалению, у меня не получилось решить эти проблемы средствами C++ 14, но зато C++ 17 позволил избавиться от приведённых выше недостатков.

Улучшаем Thread Pool с помощью C++ 17

Чтобы улучшить нашу версию нужно сначала понять в чём была основная проблема, а проблема была в том, чтобы узнать тип возвращаемого значения функции и при этом суметь положить результат вычислений (может быть разный возвращаемый тип) в 1 объект и тут на помощь приходит std::any.

Теперь мы можем хранить в нашей очереди std::function<std::any()> (запись std::future<std::any>не валидна) . Именно так я и сделал в своей первой попытке и получил очень красивый код, который не сильно отличался от изначальной реализации, но тут я столкнулся с проблемой, что std::any не может быть типа void . Тогда я решил создать класс Task, который бы смог хранить в одном случае std::function<std::any()> а в другом std::function<void()>. Рассмотрим его конструктор:

template <typename FuncRetType, typename ...Args, typename ...FuncTypes>
Task(FuncRetType(*func)(FuncTypes...), Args&&... args) :
    is_void{ std::is_void_v<FuncRetType> } {

    if constexpr (std::is_void_v<FuncRetType>) {
        void_func = std::bind(func, args...);
        any_func = []()->int { return 0; };
    }
    else {
        void_func = []()->void {};
        any_func = std::bind(func, args...);
    }
}

Мы используем if constexpr для компиляции только одной ветки условия. Если мы будем использовать обычный if, то при получении функции возвращающей void компилятор попробует преобразовать void в std::any и таким образом мы получим ошибку преобразования типа, не смотря на то, что этот каст будет происходить в другой ветке условия.

Используем typename ...Args и typename ...FuncTypes, чтобы был возможен неявный каст между std::referense_wrapper и ссылочным типом, тогда нам в функциях не придётся в сигнатуре явно прописывать std::referense_wrapper.

any_func = []()->int { return 0; }; и void_func = []()->void {}; функции-заглушки. Они позволяют избавиться от лишнего условия при вычислении значения:

void operator() () {
    void_func();
    any_func_result = any_func();
}

has_result проверяет вернёт ли функция значение или нет, а get_result получит его.

bool has_result() {
    return !is_void;
}

std::any get_result() const {
    assert(!is_void);
    assert(any_func_result.has_value());
    return any_func_result;
}

Ещё один вспомогательный класс: TaskInfo:

enum class TaskStatus {
    in_q,
    completed
};

struct TaskInfo {
    TaskStatus status = TaskStatus::in_q;
    std::any result;
};

Данная структура хранит информацию о задаче: статус и возможный результат. Если структура будет возвращать void, то поле result останется незаполненным.

Рассмотрим приватные поля класса thread_pool

std::vector<std::thread> threads;

// очередь с парой задача, номер задачи
std::queue<std::pair<Task, uint64_t>> q;

std::mutex q_mtx;
std::condition_variable q_cv;

// Будем создавать ключ как только пришла новая задача и изменять её статус
// при завершении
std::unordered_map<uint64_t, TaskInfo> tasks_info;

std::condition_variable tasks_info_cv;
std::mutex tasks_info_mtx;

std::condition_variable wait_all_cv;

std::atomic<bool> quite{ false };
std::atomic<uint64_t> last_idx{ 0 };

// переменная считающая кол-во выполненых задач
std::atomic<uint64_t> cnt_completed_tasks{ 0 };

В отличие от прошлой реализации нам понадобиться переменная cnt_completed_tasks (в прошлой реализации у нас был отдельный контейнер для завершённых задач и кол-во завершённых задач мы получали по размеру этого контейнера), для подсчёта кол-ва завершённых задач. Эта переменная будет использоваться в функции wait_all для определения того, что все задачи завершились.

Так же отдельно рассмотрим 3 разных функции ожидания результата:

void wait(const uint64_t task_id) {
    std::unique_lock<std::mutex> lock(tasks_info_mtx);
    tasks_info_cv.wait(lock, [this, task_id]()->bool {
        return task_id < last_idx && tasks_info[task_id].status == TaskStatus::completed;
    });
}

std::any wait_result(const uint64_t task_id) {
    wait(task_id);
    return tasks_info[task_id].result;
}

template<class T>
void wait_result(const uint64_t task_id, T& value) {
    wait(task_id);
    value = std::any_cast<T>(tasks_info[task_id].result);
}
  • void wait(const uint64_t task_id) - используется для ожидании задачи, которая возвращает void.

  • std::any wait_result(const uint64_t task_id) и void wait_result(const uint64_t task_id, T& value) разными способами возвращают результат.

std::any wait_result(const uint64_t task_id) вернёт std::any и пользователь сам должен будет сделать cast к нужному типу. Шаблонная функция void wait_result(const uint64_t task_id, T& value) принимает вторым аргументом ссылку на переменную, куда и будет положено новое значение и явный cast пользователь не должен будет делать.

В остальном код очень похож на предыдущую версию и код новой версии вы можете найти ТУТ.

Использование Thread Pool (С++ 17)

int int_sum(int a, int b) {
    return a + b;
}

void void_sum(int& c, int a, int b) {
    c = a + b;
}

void void_without_argument() {
    std::cout << "It's OK!" << std::endl;
}

int main() {
    thread_pool t(3);
    int c;
    t.add_task(int_sum, 2, 3);               // id = 0
    t.add_task(void_sum, std::ref(c), 4, 6); // id = 1
    t.add_task(void_without_argument);       // id = 2

    {
        // variant 1
        int res;
        t.wait_result(0, res);
        std::cout << res << std::endl;

        // variant 2
        std::cout << std::any_cast<int>(t.wait_result(0)) << std::endl;
    }

    t.wait(1);
    std::cout << c << std::endl;

    t.wait_all(); // waiting for task with id 2

    return 0;
}

В данном примере рассмотрены 2 способа получения значения через функцию wait_result. Мне лично больше нравится 2 вариант. Не смотря на то, что нужно делать каст, получается компактное решение, а так же можно поймать и отработать исключение в случае ошибки.

У нас действительно получилась версия лучше предыдущей?

И да, и нет. После анализа я получил следующие результаты:

Тип передаваемого аргумента при создании новой задачи

thread_pool c++ 14

thread_pool c++ 17

функция возвращающая void

+

+

функция возвращающая всё кроме void

+

-

std::bind

-

+

функтор

-

+

Пример с функтором и std::bind:

class Test {
public:
    void operator() () {
        std::cout << "Working with functors!\n";
    }
};

void sum(int a, int b) {
    std::cout << a + b << std::endl;
}

int main() {
    Test test;
    auto res = std::bind(sum, 2, 3);

    thread_pool t(3); // C++ 14
    t.add_task(test);
    t.add_task(res);
    t.wait_all();

    return 0;
}

А почему не получилось сделать лучше?

Изначально задумывалось реализовать Thread Pool, который сам сможет определять тип возвращаемого значения и исходя из этого типа формировать объект Task, но тип возвращаемого значения std::bind нельзя явно получить через std::invoke_result, поэтому пришлось пойти на некоторые уступки.

Итог

Мы получили 2 разные версии thread_pool. Сложно сказать какая из них лучше. Мне лично больше нравится версия с C++ 17. Она позволяет не таскать за собой много переменных как ссылки на результат, а хранит всё внутри себя. Да, эта версия уступает по функциональности, но использование функторов и std::bind не частая практика, поэтому именно это вариант я и считаю лучшим.

Комментарии (6)


  1. Sixshaman
    20.03.2022 10:29
    +3

    Если захочется оптимизировать пул потоков, то очень советую посмотреть видео-лекцию. В ней рассмотрены и очереди под каждый поток, и task stealing.


    1. FatMR Автор
      20.03.2022 11:47

      Спасибо за совет. Обязательно посмотрю


  1. tzlom
    20.03.2022 11:40
    +3

    Замерять скорость пула потоков задачей которая упирается в память- плохая идея. Мне кажется если переписать вашу задачу в однонаправленный обход массива то производительность будет еще выше. Конечно надо еще смотреть на то, что генерирует компилятор, возможно нужны будут какие нибудь флаги для включения AVX


    1. FatMR Автор
      20.03.2022 11:51

      Я пробовал однонаправленный проход, но там задачи выполнялись слишком быстро и thread pool не давал прироста производительности или он был очень маленьким. Хотелось привести более наглядный пример, но при этом не слишком сложный. Я планирую в следующей статье более подробно с этим разобраться и придумать более наглядный пример


      1. tzlom
        20.03.2022 11:58
        +3

        Берите любой алгоритм упирающийся в процессор а не память. Рекуррентные формулы например, там операнды влазят в регистры, так что память вообще не используется. Тогда увидите свой кратный прирост скорости.


        1. FatMR Автор
          20.03.2022 12:04

          Спасибо, попробую