Этот пост является заключительным в моей мини-серии из трех постов о cppcoro. cppcoro — это библиотека абстракций корутин от Льюиса Бейкера (Lewis Baker). Сегодня я покажу вам пулы потоков (thread pools).

 

Чтобы получить максимум пользы от этого поста, вам следовало бы ознакомиться с двумя моими предыдущим постами про cppcoro.

В дополнение к функции cppcoro::sync_wait, которую можно использовать для ожидания указанный Awaitable завершается, cppcoro предлагает довольно интересную функцию cppcoro::when_all.

when_all

  • when_all: создает Awaitable, который ожидает все свои Input-Awaitable и возвращает совокупность их результатов.

Я упростил определение функции cpporo::when_all для этой статьи. Следующий пример должен помочь вам понять что к чему.

// cppcoroWhenAll.cpp

#include <chrono>
#include <iostream>
#include <thread>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/when_all.hpp>

using namespace std::chrono_literals;

cppcoro::task<std::string> getFirst() {
    std::this_thread::sleep_for(1s);                       // (3)
    co_return "First";
}

cppcoro::task<std::string> getSecond() {
     std::this_thread::sleep_for(1s);                      // (3)
    co_return "Second";
}

cppcoro::task<std::string> getThird() {
     std::this_thread::sleep_for(1s);                     // (3)
    co_return "Third";
}


cppcoro::task<> runAll() {
                                                          // (2)
    auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird());
    
    std::cout << fir << " " << sec << " " << thi << std::endl;
    
}

int main() {
    
    std::cout << std::endl;
    
    auto start = std::chrono::steady_clock::now();
    
    cppcoro::sync_wait(runAll());                          // (1)
    
    std::cout << std::endl;
    
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;   // (4)
    std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
    
    std::cout << std::endl;

}

Задача верхнего уровня cppcoro::sync_wait(runAll()) (строка 1) ожидает Awaitable runAll. runAll ожидает Awaitable getFirst, getSecond и getThird (строка 2). Awaitable runAll, getFirst, getSecond и getThird являются корутирами. Каждая из функций get засыпает на одну секунду (строка 3). Три раза по одной секунде составляет три секунды. Это и будет временем, в течение которого вызов cppcoro::sync_wait(runAll()) ожидает корутины. Строка 4 отображает это время.

Теперь, когда мы познакомились с функцией cppcoro::when_all, позвольте мне добавить к ней пулы потоков.

static_thread_pool

  • static_thead_pool: управление работой пула потоков фиксированного размера.

cppcoro::static_thread_pool можно вызывать с числовым параметром или без. Это число обозначает количество созданных потоков. Если вы не укажете число, то вместо него будет использована функция C++11 std::thread::hardware_concurrency(). std::thread::hardware_concurrency предоставляет вам информацию о количестве аппаратных потоков, поддерживаемых вашей системой. Это может быть количество процессоров или ядер, которые находятся в вашем распоряжении.

Позвольте мне продемонстрировать это на практике. Следующий пример я вляется модификацией предыдущего — он одновременно выполняет корутины getFirst, getSecond и getThird .

// cppcoroWhenAllOnThreadPool.cpp

#include <chrono>
#include <iostream>
#include <thread>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/static_thread_pool.hpp>
#include <cppcoro/when_all.hpp>


using namespace std::chrono_literals;

cppcoro::task<std::string> getFirst() {
    std::this_thread::sleep_for(1s);
    co_return "First";
}

cppcoro::task<std::string> getSecond() {
    std::this_thread::sleep_for(1s);
    co_return "Second";
}

cppcoro::task<std::string> getThird() {
    std::this_thread::sleep_for(1s);
    co_return "Third";
}

template <typename Func>
cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) {
    co_await tp.schedule();
    auto res = co_await func();
    co_return res;
}

cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) {
    
    auto[fir, sec, thi] = co_await cppcoro::when_all(    // (3)
        runOnThreadPool(tp, getFirst),
        runOnThreadPool(tp, getSecond), 
        runOnThreadPool(tp, getThird));
    
    std::cout << fir << " " << sec << " " << thi << std::endl;
    
}
    
int main() {
    
    std::cout << std::endl;
    
    auto start = std::chrono::steady_clock::now();

    cppcoro::static_thread_pool tp;                         // (1)
    cppcoro::sync_wait(runAll(tp));                         // (2)
    
    std::cout << std::endl;
    
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;    // (4)
    std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
    
    std::cout << std::endl;

}

Сейчас я объясню основные отличия от предыдущей программы cppcoroWhenAll.cpp. Я создал в строке (1) пул потоков tp и использовал его в качестве аргумента для функции runAll(tp) (строка 2). Функция runAll использует пул потоков для одновременного запуска корутин. Благодаря структурным привязкам (строка 3) значения каждой корутины можно легко агрегировать и присвоить переменной. В итоге функция main выполняется за одну, а не за три секунды, как раньше.

Возможно, вы знаете, что мы получаем с защелками и барьерами C++20. Защелки (latches) и барьеры (barrier) — это механизмы синхронизации потоков, которые позволяют некоторым потокам блокироваться до тех пор, пока счетчик не станет равным нулю. cppcoro также поддерживает защелки и барьеры.

async_latch

  • async_latch: позволяет заставить корутину асинхронно ожидать, пока счетчик не станет равным нулю

Следующая программа cppcoroLatch.cpp демонстрирует синхронизацию потоков с помощью cppcoro::async_latch.

// cppcoroLatch.cpp

#include <chrono>
#include <iostream>
#include <future>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/async_latch.hpp>
#include <cppcoro/task.hpp>

using namespace std::chrono_literals; 

cppcoro::task<> waitFor(cppcoro::async_latch& latch) {
    std::cout << "Before co_await" << std::endl;
    co_await latch;                              // (3)
    std::cout << "After co_await" << std::endl;
}

int main() {
    
    std::cout << std::endl;

    cppcoro::async_latch latch(3);              // (1)

                                                // (2)
    auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); }); 

    auto counter1 = std::async([&latch] {       // (2)
        std::this_thread::sleep_for(2s);
        std::cout << "counter1: latch.count_down() " << std::endl;
        latch.count_down();
    });
        
    auto counter2 = std::async([&latch] {      // (2)
        std::this_thread::sleep_for(1s);
        std::cout << "counter2: latch.count_down(2) " << std::endl;
        latch.count_down(2);
    });

    waiter.get(), counter1.get(), counter2.get();
    
    std::cout << std::endl;

}

В строке (1) я создаю cppcoro::asynch_latch и инициализирую счетчик равным 3. На этот раз я использую std::async (строка (2)) для одновременного запуска трех корутин. Каждый std::async получает latch для каждой ссылки. В строке (3) корутина waitFor ожидает, пока счетчик не станет равным нулю. Корутина counter1 засыпает на 2 секунды, а затем отсчитывает 1 событие. Напротив, counter2 засыпает на 1 секунду и отсчитает 2 события. На скриншоте показано это чередование потоков.

Что дальше?

На данный момент я успел написать о трех из большой четверки C++20: концептах, диапазонах и корутинах. В моем путешествии по большой четверке все еще отсутствуют модули. Они и станут темой моих следующих постов.

Кстати, если кто-то хочет написать пост о возможностях C++20, о которых я тоже планирую писать в будущем, свяжитесь со мной. Я с удовольствием опубликую ее и переведу на английский/немецкий язык, если это необходимо.

Программа наставничества

Моя новая программа наставничества "Fundamentals for C++ Professionals" стартует в апреле. Получить дополнительную информацию можно здесь.


Всех желающих приглашаем на открытый урок в OTUS «Умные указатели», на котором разберем, что такое умные указатели и зачем они нужны. Проведем обзор умных указателей входящих в stl, unique_ptr, Shared_ptr, weak_ptr.

Комментарии (1)


  1. kovserg
    03.03.2022 15:46

    Что дальше?

    А дальше OpenMP, SYCL, OpenCL, Cuda и ComputeShaders