Введение


Данная статья является продолжением данной статьи.


Бесконечный поток данных при помощи co_yield


Код ниже реализует бесконечный поток данных. Корутина getNext использует co_yield для создания потока данных который начинается со start и выдает по запросу каждое новое значение с шагом step.


Бесконечный поток данных
//infiniteDataStream.cpp
#include <coroutine>
#include <memory>
#include <iostream>

template <typename T>
struct Generator {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;
    Generator(handle_type h) : coro(h) {}                       // (3)
    handle_type coro;
    std::shared_ptr<T> value;
    ~Generator() {
        if (coro) {
            coro.destroy();
        }
    }
    Generator(const Generator &) = delete;
    Generator& operator=(const Generator &) = delete;
    Generator(Generator &&other) : coro(other.coro) {
        other.coro = nullptr;
    }
    Generator& operator=(Generator &&other) {
        coro = other.coro;
        other.coro = nullptr;
        return *this;
    }
    T getValue() {
        return coro.promise().current_value;
    }
    bool next() {                                               // (5)
        coro.resume();
        return not coro.done();
    }
    struct promise_type {
        promise_type() = default;                               // (1)
        ~promise_type() = default;
        auto initial_suspend() {                                // (4)
            return std::suspend_always{};
        }
        auto final_suspend() {
            return std::suspend_always{};
        }
        auto get_return_object() {                              // (2)
            return Generator{handle_type::from_promise(*this)};
        }
        auto return_void() {
            return std::suspend_never{};
        }
        auto yield_value(T value) {                             // (6)
            current_value = value;
            return std::suspend_always{};
        }
        void unhandled_exception() {
            std::exit(1);
        }
        T current_value;
    };
};
Generator <int> getNext(int start = 0, int step = 1) {
    auto value = start;
    for (int i = 0; ; ++i) {
        co_yield value;
        value += step;
    }
}
int main() {
    std::cout << "getNext():";
    auto gen = getNext();
    for (int i = 0; i <= 10; ++i) {
        gen.next();
        std::cout << " " << gen.getValue();                     // (7)
    }
    std::cout << "\ngetNext(100, -10):";
    auto gen2 = getNext(100, -10);
    for (int i = 0; i <= 20; ++i) {
        gen2.next();
        std::cout << " " << gen2.getValue();
    }
    std::cout << std::endl;
}

Примечание переводчика: сборку осуществлял командой g++ -fcoroutines infiniteDataStream.cpp
В функции main создается 2 корутины. Первая, gen, возвращает значения от 0 до 10. Вторая, gen2, — от 100 до -100 с шагом 10. Вывод программы:


$ ./infDS
getNext(): 0 1 2 3 4 5 6 7 8 9 10
getNext(100, -10): 100 90 80 70 60 50 40 30 20 10 0 -10 -20 -30 -40 -50 -60 -70 -80 -90 -100

Метки с числами в комментариях в программе infiniteDataStream.cpp описывают первую итерацию в следующей последовательности:


  1. Создание promise объекта
  2. Вызов promise.get_return_object() и сохранение результата в локальной переменной
  3. Создание генератора
  4. Вызов promise.initial_suspend(), т.к. генератор "ленивый", следовательно, suspend_always
  5. Запрос следующего значения и возврат флага, если генератор исчерпал себя
  6. Действие на co_yield, после чего будет доступно следующее значение
  7. Получение следующего значения

В последующих итерациях выполняются только шаги 5 и 6.


Синхронизация потоков посредством co_await


Для синхронизации потоков рекомендуется использовать co_await. Пока один поток подготавливает обрабатываемый пакет, другой — ожидает таковой. Условные переменные (condition variables), promises и futures, а так же атомарные флаги могут быть использованы для реализации модели отправитель-получатель. Благодаря корутинам достаточно легко синхронизировать потоки избегая присущие условным переменным риски, как ложные срабатывания (spurious wakeups) и игнорирование пробуждения (lost wakeups).


Синхронизация потоков
// senderReceiver.cpp
#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>

class Event {
public:
    Event() = default;
    Event(const Event &) = delete;
    Event(Event &&) = delete;
    Event& operator=(const Event &) = delete;
    Event& operator=(Event &&) = delete;
    class Awaiter;
    Awaiter operator co_await() const;
    void notify();
private:
    friend class Awaiter;
    mutable std::atomic<void *> suspendedWaiter{nullptr};
    mutable std::atomic<bool> notified{false};
};

class Event::Awaiter {
public:
    Awaiter(const Event &e) : event(e) {}
    bool await_ready() const;
    bool await_suspend(std::coroutine_handle<> ch);
    void await_resume() {}
private:
    friend class Event;
    const Event &event;
    std::coroutine_handle<> coroutineHandle;
};

bool Event::Awaiter::await_ready() const {
    if (event.suspendedWaiter.load() != nullptr) {
        throw std::runtime_error("More than one waiter is not valid");
    }
    return event.notified; // true - корутина выполняется как обычная функция, false - корутина приостановлена
}

bool Event::Awaiter::await_suspend(std::coroutine_handle<> ch) {
    coroutineHandle = ch;
    if (event.notified) {
        return false;
    }
    // сохранить waiter для последующего уведомления
    event.suspendedWaiter.store(this);
    return true;
}

void Event::notify() {
    notified = true;
    // попытка загрузить waiter
    auto *waiter = static_cast<Awaiter *>(suspendedWaiter.load());
    // проверка доступен ли waiter
    if (waiter != nullptr) {
        // возобновить работу корутины
        waiter->coroutineHandle.resume();
    }
}

Event::Awaiter Event::operator co_await() const {
    return Awaiter{*this};
}

struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

Task receiver(Event &event) {
    auto start = std::chrono::high_resolution_clock::now();
    co_await event;
    std::cout << "Got the notification!" << std::endl;
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;
    std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}

int main() {
    std::cout << "Notification before waiting" << std::endl;
    Event event1{};
    auto senderThread1 = std::thread([&event1] { event1.notify(); });
    auto receiverThread1 = std::thread(receiver, std::ref(event1));
    receiverThread1.join();
    senderThread1.join();

    std::cout << "\nNotification after 2 seconds waiting" << std::endl;
    Event event2{};
    auto receiverThread2 = std::thread(receiver, std::ref(event2));
    auto senderThread2 = std::thread([&event2] {
                                         using namespace std::chrono_literals;
                                         std::this_thread::sleep_for(2s);
                                         event2.notify();
                                     });
    receiverThread2.join();
    senderThread2.join();
}

Примечание переводчика: сборку осуществлял командой g++ -pthread -fcoroutines senderReceiver.cpp


С точки зрения пользователя, синхронизация потоков посредством корутин достаточно проста. Стоит заметить, что в примере senderReceiver.cpp поток senderThread1 и senderThread2 используют событие event для отправки уведомлений (eventN.notify()). Функция обработки уведомлений receiver представляет собой корутину, которая выполняется в потоках receiverThread1 и receiverThread2. Внутри корутины осуществляется замер времени и вывод его на экран, что отображает как долго корутина осуществляла ожидание. Ниже представлен вывод программы.


Вывод программы senderReceiver


$ ./senderReceiver
Notification before waiting
Got the notification!
Waited 3.7006e-05 seconds.

Notification after 2 seconds waiting
Got the notification!
Waited 2.00056 seconds.

Если сравнить класс Generator в примере с бесконечным потоком данных и класс Event в предыдущем примере, то можно заметить некоторые различия. В первом случае, Generator одновременно и awaitable и awaiter; Event же использует operator co_await для возврата awaiter. Такое разделение awaitable и awaiter позволяет улучшить структуру кода.


Из вывода можно сделать вывод, что вторая корутина выполняется чуть больше, чем 2 секунды. Причина заключается в том, что event1 посылает уведомление до того, как корутина была приостановлена, однако event2 посылает уведомление после того, как прошло 2 секунды.
Принцип работы корутины в примере senderReceiver.cpp не так лёгок для понимания. Класс Event имеет пару интересных членов: suspendedWaiter и notified. Первый содержит waiter для посылки сигнала, второй же содержит состояние уведомления.


Более детально, event1 посылает уведомление до того как receiverThread1 был запущен. Вызов even1.notify() сначала устанавливает флаг notified после чего загружает потенциального waiter. В данном случае waiter является nullptr т.к. не был установлен ранее, что означает, что последующий waiter->coroutineHandle.resume() не будет выполнен. Впоследствии метод await_ready проверяет был ли установлен waiter и, если был, бросает исключение std::runtime_error. Наиболее важно тут обратить внимание на возвращаемое значение. Значение notified было ранее установлено в true в методе notify, что означает, в данном случае, что корутина не была приостановлена и выполняется как обычная функция.


В случае с event2 вызов co_await event выполняется до того, как посылается уведомление. Данный вызов инициирует выполнение await_ready. Следует заметить ключевое различие, что в данном случае флаг event.notified установлен в значение false что обуславливает приостановку корутины. Технически, вызывается метод await_suspend который получает handle корутины ch и сохраняет его для последующего вызова в переменную corotineHandle. Последующий вызов, в данном случае, означает возобновление работы. К тому же, waiter сохраняется в переменной suspendedWaiter. Когда затем срабатывает уведомление через event2.notify начинает выполнение соответствующий метод notify. Различие тут в том, что в условии где проверяется доступен ли waiter таковой уже не будет nullptr. В результате waiter использует coroutineHandle для возобновления работы корутины.