Единый фреймворк асинхронности и параллелизма, возможность делить вычислительные ресурсы разными кодовыми базами без сложной интеграции, простота использования и гибкость - все это то, чего в C++ никогда не было.

В конце июня стандартный комитет одобрил включение std::execution (P2300) в C++26. Это пропозал, который призван решить вышеуказанные проблемы. Давайте разбираться!

Начинаем издалека

Представьте ситуацию - вы разрабатывается сервис, который перекладывает жсоны делает какие-либо полезные вычисления. Например, это может быть компрессия картинок - относительно тяжелая операция с точки зрения потребления CPU. В ядре вашего сервиса находится thread pool, на котором выполняются все вычисления. В какой-то момент вы подверглись АИ хайпу и решили добавить классификацию объекта на картинке в этот же сервис. Для этого вы подключили стороннюю библиотеку инференса (прогон новых данных через обученную модель) к проекту. Начав тестировать новую версию сервиса, вы заметили серьезную просадку производительности, даже с поправкой на то, что теперь выполняется больше работы. Разобравшись, оказалось, что в новой библиотеке есть свой thread pool, и теперь при работе сервиса у вас постоянная борьба за ограниченное количество ядер CPU, контекст свитчи и cache thrashing.

Какие есть варианты решения проблемы

(Для простоты примера считаем, что каждая отдельная работа компрессии и инференса выполняется в 1 потоке)

  1. Уменьшить размеры обоих тред пулов, чтобы не было борьбы за ядра - решает проблему борьбы за ресурсы, но ядра будут простаивать часть времени, например когда есть задачи на инференс, но нет задач на компрессию. Так же этот вариант подразумевает, что автор библиотеки инференса продумал интерфейс доступа к внутреннему тред пулу и его конфигурации, что бывает далеко не всегда.

  2. Интегрировать распределение работы нашего сервиса с библиотекой таким образом, чтобы использовался общий тред пул - тогда мы получим оптимальную утилизацию ресурсов. Однако скорее всего ни наш сервис, ни библиотека не готова к подобной интеграции, ведь нету единого стандартного фреймворка (или хотя бы набора паттернов), которому все следует. На практике подобная интеграция потребует, как минимум, дорогого рефакторинга сервиса, а вполне вероятно и форка с рефакторингом сторонней библиотеки. Не стоит забывать, что ваши проекты могут использовать более 1 библиотеки, и с каждой из них интеграция будет чем-то отличаться. Все это очень дорого и долго. А если начать думать о таких вещах, как ранняя отмена выполнения, то становится еще хуже.

Если автор статьи не забыл (или намеренно не утаил!) какую-то очевидную более хорошую альтернативу, то оба варианта неприятные.

И что же делать?

std::execution предлагает стандартную модель асинхронности, которая закрывает вопросы совместимости асинхронных интерфейсов, деления вычислительных ресурсов, а так же предоставляет набор базовых алгоритмов, позволяющих строить сложные асинхронные вычислительные графы.

В нашем примере, если автор сторонней библиотеки предоставляет интерфейс, совместимый с std::execution, то интеграция вычислительных ресурсов будет практически бесплатной.

Базовый пример использования

Пробуйте на годболте

(Код взят из пропозала, но немного подправлен, чтобы работать на библиотеке stdexec - это одна из имплементаций пропозала; все кроме thread_pool так же как в стандарте)

scheduler auto sch = thread_pool.get_scheduler();

sender auto begin = schedule(sch);
sender auto hi = then(begin, []{ 
    std::cout << "Hello world! Have an int.";
    return 13;
});
sender auto add_42 = then(hi, [](int arg) { return arg + 42; });

auto [i] = sync_wait(add_42).value();

Происходит тут следующее:

  1. Запрашивается scheduler (планировщик), ассоциированный с определенным вычислительным ресурсом - в данном случае ресурс это обычный тред пул. Тред пул не является частью стандарта, в отличии от интерфейса scheduler. Авторам тред пулов придется писать свои планировщики.

  2. schedule(scheduler) - эта функция вернет sender (отсюда неформальное название пропозала - Senders / Receivers) - единицу асинхронной работы, с которого можно начать строить вычислительный граф, исполняемый на ресурсе, ассоциированном с переданным планировщиком. Подробнее про sender будет далее.

  3. then - принимает sender и функтор. Когда переданный sender завершит выполнение, с его результатом будет вызван переданный функтор.

  4. then, да и остальные фабрики sender-ов, можно компоновать друг с другом в любом порядке, главное чтобы совпадали типы входных и выходных данных. В нашем примере, когда будет выполнен 'hi', его результат - 13 - будет передан в add_42, который в свою очередь вернет (13 + 42).

  5. Наверное самое важное, что надо понять в этом примере, это то что никакая работа не выполняется, пока граф не будет явно запущен. Вызовы schedule и then только задают структуру графа. Чтобы работа начала подаваться в планировщик, ее нужно инициировать.

  6. sync_wait - запуск асинхронного графа, синхронное ожидание его выполнения (да, смысла это не имеет в таком виде, но для примера удобно) и считывание результата.

Ключевые абстракции

Вычислительный ресурс - нечто, на чем можно запускать вычисления. Например тред пул, GPU, или просто текущий поток.

Scheduler - по сути стандартизированный интерфейс для доступа к какому-либо вычислительному ресурсу. Планировщик так же может содержать какую-либо стратегию распределения работы. Например, стратегией может быть наличие affinity треда пушащего работу с каким-либо тредом внутри тредпула.

Sender - объект, описывающий какие-либо вычисления. Думаю, большинству (включая меня) не очевиден выбор названия. Идея в том, что sender "отправляет" значения, полученные по результатам вычисления. Сендеры можно компоновать и строить из них вычислительные графы. Сендер может быть одно- и многоразовым, его можно форкать (подключать более одного сендера) и даже отменять выполнение!

Receiver - колбэк, который поддерживает 3 варианта вызова: успешный с результатом операции, неудачный с информацией об ошибке, или раннего завершения, что может произойти при отмене выполнения операции. С ресиверами взаимодействовать разработчику не обязательно - это для авторов библиотек.

Отмена выполнения

Возвращаемся к примеру. Наш сервис и библиотека были переписан на std::execution с достижением хорошей утилизации вычислительных ресурсов. Однако в процессе эксплуатации мы заметили, что иногда сохранение ужатой картинки в удаленное хранилище объектов зависает. Для того, чтобы проблемы с сетью не останавливали остальную работу, решено добавить отмену ожидания запроса по таймауту.

//  Старый вариант
sender auto result = 
    just(image)
    | store_to_s3();

// С таймаутом
sender auto result = 
    just(image)
    | timeout(
        store_to_s3(),
        5s);

Если store_to_s3 не отправит результат в течении 5 секунд, то его выполнение будет отменено. timeout не является частью стандарта. Суть не в конкретной имплементации, а в том, что интерфейс остановки асинхронной работы теперь стандартизирован. Больше не надо будет прокидывать глобальные переменные вида "bool stop_running", или колбэки - все нужные механизмы уже продуманы, единственное что надо сделать, это корректно обрабатывать запрос на отмену в конкретном алгоритме (сендере).

just - это сендер, отправляющий переданное ему в конструкторе значение.

Взаимодействие с корутинами

All (почти) awaitables are senders - иначе говоря, мы можем использовать корутины в алгоритмах, работающих с сендерами:

task<void> some_coro(int);

// when_all - сендер, конкурентно запускающий N переданных сендеров, 
// и завершающий выполнение, когда завершится выполнение последнего 
// переданного сендера.
sync_wait(
    when_all(
        some_coro(1),
        some_coro(2)
    )
);

Наоборот - вызов сендера из корутины - тоже работает, но не со всеми сендерами. Проблема в том, что интерфейс сендера фундаментально более выразительный, чем у корутины. В стандарте будет execution::with_awaitable_senders, который позволяет относительно легко встроить в ваш корутиновый класс (task) поддержку сендеров, отправляющих (возвращающих в контексте корутин) только 1 значение. Если вы это сделаете, то сможете делать co_await вашего сендера.

(Пример взят из пропозала)

// SingleValueSender - нестандартный концепт, 
// проверяющий что сендер отправляет только 1 значение 
template<class S>
  requires SingleValueSender<S&> 
task<SenderReturnType<S>::type> retry(S sender) {
    for (;;) {
        try {
            co_return co_await sender;
        } catch(...) {
        }
    }
}

Для остальных сендеров придется либо писать обвязку самостоятельно, либо использовать execution::into_variant.

template<class S>
task<void> retry(S sender) {
    for (;;) {
        try {
            // если sender отправляет (int, float), то типом some_variant
            // будет variant<tuple<int, float>>
            auto some_variant = co_await execution::into_variant(sender);
          
            do_something(some_variant);
            co_return;
        } catch(...) {
        }
    }
}

Большой пример

Показывает, как мог бы выглядеть асинхронный пайплайн обработки какого-нибудь http запроса.

(Взято из stdexec)

struct http_request;
struct http_response;
struct classification_result;
struct image;

enum class obj_type {
    human,
    dog,
    cat,
    /* другие варианты объектов... */
    general_error, 
    cancelled,    
};

// Получение картинки из тела запроса
image extract_image(http_request req);
// Классификация объекта на картинке
obj_type run_classifier(image);
// Генерация картинки, закодированной в base64
string generate_body_with_image();

// Запуск классификации
classification_result do_classify(image img) {
    obj_type result = run_classifier(img);
    return {result, 0};
}

// Обработчик возникших ошибок; переводит исключение к общему типу с do_classify
classification_result on_classification_error(std::exception_ptr) {
    return {obj_type::general_error, 100, {}};
}

// Обработчик отмены; производит тот же тип, что и do_classify
classification_result on_classification_cancelled() {
    return {obj_type::cancelled, 100};
}

// Общий обработчик, вызываемый для всех 3 вариантов завершения do_classify
// Переводит результат работы к ответу на запрос
http_response to_response(classification_result res) {
    switch(res.type_) {
        case(obj_type::general_error):
            return {500, res.details_};
        case(obj_type::cancelled):
            return {503, "cancelled"};
        default:
            return {200, to_string(res.type_)};
    }
}

// Обработчик http_request
stdexec::sender auto handle_classify_request(const http_request& req) {
    return
        // just "отправляет" http_request
        stdexec::just(req)
        // Чтение картинки из http_request
        | stdexec::then(extract_image)
        // Классификация картинки
        | stdexec::then(do_classify)
        // Обработчик ошибки
        | stdexec::upon_error(on_classification_error)
        // Обработчик отмены
        | stdexec::upon_stopped(on_classification_cancelled)
        // Перевод результата к http_response
        | stdexec::then(to_response);
}

int main() {
    exec::async_scope scope;
    exec::static_thread_pool pool{4};
    stdexec::scheduler auto sched = pool.get_scheduler();

    // Генерируем фейковые запросы
    for (int i = 0; i < 12; i++) {
        http_request req{"/classify", {}, generate_body_with_image()};

        // Сендер обрабатывающий http_request и отправляющий http_response
        stdexec::sender auto snd = handle_classify_request(req);

        // "Ответ" на запрос
        stdexec::sender auto action = 
        std::move(snd) //
            | stdexec::then([](http_response resp) {
                std::cout << std::format(
                    "Sending response: {} / {}\n", 
                    resp.status_code_, 
                    resp.body_);
            });
        scope.spawn(stdexec::on(sched, std::move(action)));
    }

    stdexec::sync_wait(scope.on_empty());
    pool.request_stop();
}

Обратите внимание на использование async_scope - это пока еще нестандартное (но говорят, что пропозал P3149 тоже должен попасть в C++26) расширение. Суть компоненты в том, что она позволяет перевести динамическое (не известное в момент компиляции) количество единиц работы (сендеров) в 1 сендер, и корректно завершить работу. Концептуально схоже с when_all, но when_all работает на этапе компиляции.

Что по имплементациям

Есть две основные:

  • libunifex от Meta

  • stdexec от Nvidia (является референсом и более активно разрабатываемой версией после перехода Eric Niebler в Nvidia)

Заключение

В статье рассматривалась только та часть нововведений, которую увидят пользователи библиотек, написанных на std::execution. Под капотом там еще много всего интересного, и вам придется с этим разбираться, если вы захотите написать свой собственный сендер, или планировщик.

По моему мнению, главным минусом std::execution является непомерная сложность. Если имплементация корутин в C++20 казалась мне сложной, то std::execution, как будто бы, поставил цель ее переплюнуть. За раз вводится огромное количество новых абстракций, взаимодействие которых не очевидно, особенно если вы не следите за этой областью последние 10 лет. Порог вхождения даже по меркам плюсов очень высокий.

Но с другой стороны, предлагаемая std::execution модель уже зарекомендовала себя в продах топовых компаний (как минимум Meta & NVIDIA), как производительная и достаточно универсальная. Очень интересно будет посмотреть на уровень использования в комьюнити лет через 10.

Заинтересовало?

Чтобы быть в курсе последних новостей и интересных разработок в мире C++, подписывайтесь на мой телеграм канал.


Комментарии (3)


  1. Kelbon
    11.09.2024 08:22

    // SingleValueSender - нестандартный концепт, 
    // проверяющий что сендер отправляет только 1 значение 
    template<class S>
      requires SingleValueSender<S&> 
    task<SenderReturnType<S>::type> retry(S sender) {
        for (;;) {
            try {
                co_return co_await sender;
            } catch(...) {
            }
        }
    }

    Это конечно очень красиво (неочень), но это просто не может работать. После запуска корутина уже другая, нежели была до первого запуска. Подобный retry просто невозможен


    1. krestovii_podhod Автор
      11.09.2024 08:22

      Вот демонстрация: https://godbolt.org/z/P1oYeGn8W : )


      1. Kelbon
        11.09.2024 08:22

        это предполагает, что сендер это не корутина, плюс он не меняет состояние из-за старта, т.е. фундаментально неэффективно, так как зачастую операции не готовятся к тому, что их перезапустят и например переиспользуют память, либо просто после запуска удаляют ненужные теперь ресурсы