Введение
Данная статья является продолжением данной статьи.
Бесконечный поток данных при помощи co_yield
Код ниже реализует бесконечный поток данных. Корутина getNext
использует co_yield
для создания потока данных который начинается со start
и выдает по запросу каждое новое значение с шагом step
.
//infiniteDataStream.cpp
#include <coroutine>
#include <memory>
#include <iostream>
template <typename T>
struct Generator {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
Generator(handle_type h) : coro(h) {} // (3)
handle_type coro;
std::shared_ptr<T> value;
~Generator() {
if (coro) {
coro.destroy();
}
}
Generator(const Generator &) = delete;
Generator& operator=(const Generator &) = delete;
Generator(Generator &&other) : coro(other.coro) {
other.coro = nullptr;
}
Generator& operator=(Generator &&other) {
coro = other.coro;
other.coro = nullptr;
return *this;
}
T getValue() {
return coro.promise().current_value;
}
bool next() { // (5)
coro.resume();
return not coro.done();
}
struct promise_type {
promise_type() = default; // (1)
~promise_type() = default;
auto initial_suspend() { // (4)
return std::suspend_always{};
}
auto final_suspend() {
return std::suspend_always{};
}
auto get_return_object() { // (2)
return Generator{handle_type::from_promise(*this)};
}
auto return_void() {
return std::suspend_never{};
}
auto yield_value(T value) { // (6)
current_value = value;
return std::suspend_always{};
}
void unhandled_exception() {
std::exit(1);
}
T current_value;
};
};
Generator <int> getNext(int start = 0, int step = 1) {
auto value = start;
for (int i = 0; ; ++i) {
co_yield value;
value += step;
}
}
int main() {
std::cout << "getNext():";
auto gen = getNext();
for (int i = 0; i <= 10; ++i) {
gen.next();
std::cout << " " << gen.getValue(); // (7)
}
std::cout << "\ngetNext(100, -10):";
auto gen2 = getNext(100, -10);
for (int i = 0; i <= 20; ++i) {
gen2.next();
std::cout << " " << gen2.getValue();
}
std::cout << std::endl;
}
Примечание переводчика: сборку осуществлял командой g++ -fcoroutines infiniteDataStream.cpp
В функции main
создается 2 корутины. Первая, gen
, возвращает значения от 0 до 10. Вторая, gen2
, — от 100 до -100 с шагом 10. Вывод программы:
$ ./infDS
getNext(): 0 1 2 3 4 5 6 7 8 9 10
getNext(100, -10): 100 90 80 70 60 50 40 30 20 10 0 -10 -20 -30 -40 -50 -60 -70 -80 -90 -100
Метки с числами в комментариях в программе infiniteDataStream.cpp
описывают первую итерацию в следующей последовательности:
- Создание promise объекта
- Вызов
promise.get_return_object()
и сохранение результата в локальной переменной - Создание генератора
- Вызов
promise.initial_suspend()
, т.к. генератор "ленивый", следовательно,suspend_always
- Запрос следующего значения и возврат флага, если генератор исчерпал себя
- Действие на
co_yield
, после чего будет доступно следующее значение - Получение следующего значения
В последующих итерациях выполняются только шаги 5 и 6.
Синхронизация потоков посредством co_await
Для синхронизации потоков рекомендуется использовать co_await
. Пока один поток подготавливает обрабатываемый пакет, другой — ожидает таковой. Условные переменные (condition variables), promises и futures, а так же атомарные флаги могут быть использованы для реализации модели отправитель-получатель. Благодаря корутинам достаточно легко синхронизировать потоки избегая присущие условным переменным риски, как ложные срабатывания (spurious wakeups) и игнорирование пробуждения (lost wakeups).
// senderReceiver.cpp
#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>
class Event {
public:
Event() = default;
Event(const Event &) = delete;
Event(Event &&) = delete;
Event& operator=(const Event &) = delete;
Event& operator=(Event &&) = delete;
class Awaiter;
Awaiter operator co_await() const;
void notify();
private:
friend class Awaiter;
mutable std::atomic<void *> suspendedWaiter{nullptr};
mutable std::atomic<bool> notified{false};
};
class Event::Awaiter {
public:
Awaiter(const Event &e) : event(e) {}
bool await_ready() const;
bool await_suspend(std::coroutine_handle<> ch);
void await_resume() {}
private:
friend class Event;
const Event &event;
std::coroutine_handle<> coroutineHandle;
};
bool Event::Awaiter::await_ready() const {
if (event.suspendedWaiter.load() != nullptr) {
throw std::runtime_error("More than one waiter is not valid");
}
return event.notified; // true - корутина выполняется как обычная функция, false - корутина приостановлена
}
bool Event::Awaiter::await_suspend(std::coroutine_handle<> ch) {
coroutineHandle = ch;
if (event.notified) {
return false;
}
// сохранить waiter для последующего уведомления
event.suspendedWaiter.store(this);
return true;
}
void Event::notify() {
notified = true;
// попытка загрузить waiter
auto *waiter = static_cast<Awaiter *>(suspendedWaiter.load());
// проверка доступен ли waiter
if (waiter != nullptr) {
// возобновить работу корутины
waiter->coroutineHandle.resume();
}
}
Event::Awaiter Event::operator co_await() const {
return Awaiter{*this};
}
struct Task {
struct promise_type {
Task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
Task receiver(Event &event) {
auto start = std::chrono::high_resolution_clock::now();
co_await event;
std::cout << "Got the notification!" << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}
int main() {
std::cout << "Notification before waiting" << std::endl;
Event event1{};
auto senderThread1 = std::thread([&event1] { event1.notify(); });
auto receiverThread1 = std::thread(receiver, std::ref(event1));
receiverThread1.join();
senderThread1.join();
std::cout << "\nNotification after 2 seconds waiting" << std::endl;
Event event2{};
auto receiverThread2 = std::thread(receiver, std::ref(event2));
auto senderThread2 = std::thread([&event2] {
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
event2.notify();
});
receiverThread2.join();
senderThread2.join();
}
Примечание переводчика: сборку осуществлял командой g++ -pthread -fcoroutines senderReceiver.cpp
С точки зрения пользователя, синхронизация потоков посредством корутин достаточно проста. Стоит заметить, что в примере senderReceiver.cpp
поток senderThread1
и senderThread2
используют событие event
для отправки уведомлений (eventN.notify()
). Функция обработки уведомлений receiver
представляет собой корутину, которая выполняется в потоках receiverThread1
и receiverThread2
. Внутри корутины осуществляется замер времени и вывод его на экран, что отображает как долго корутина осуществляла ожидание. Ниже представлен вывод программы.
Вывод программы senderReceiver
$ ./senderReceiver
Notification before waiting
Got the notification!
Waited 3.7006e-05 seconds.
Notification after 2 seconds waiting
Got the notification!
Waited 2.00056 seconds.
Если сравнить класс Generator
в примере с бесконечным потоком данных и класс Event
в предыдущем примере, то можно заметить некоторые различия. В первом случае, Generator
одновременно и awaitable и awaiter; Event
же использует operator co_await
для возврата awaiter. Такое разделение awaitable и awaiter позволяет улучшить структуру кода.
Из вывода можно сделать вывод, что вторая корутина выполняется чуть больше, чем 2 секунды. Причина заключается в том, что event1
посылает уведомление до того, как корутина была приостановлена, однако event2
посылает уведомление после того, как прошло 2 секунды.
Принцип работы корутины в примере senderReceiver.cpp
не так лёгок для понимания. Класс Event
имеет пару интересных членов: suspendedWaiter
и notified
. Первый содержит waiter для посылки сигнала, второй же содержит состояние уведомления.
Более детально, event1
посылает уведомление до того как receiverThread1
был запущен. Вызов even1.notify()
сначала устанавливает флаг notified
после чего загружает потенциального waiter. В данном случае waiter
является nullptr
т.к. не был установлен ранее, что означает, что последующий waiter->coroutineHandle.resume()
не будет выполнен. Впоследствии метод await_ready
проверяет был ли установлен waiter и, если был, бросает исключение std::runtime_error
. Наиболее важно тут обратить внимание на возвращаемое значение. Значение notified
было ранее установлено в true
в методе notify
, что означает, в данном случае, что корутина не была приостановлена и выполняется как обычная функция.
В случае с event2
вызов co_await event
выполняется до того, как посылается уведомление. Данный вызов инициирует выполнение await_ready
. Следует заметить ключевое различие, что в данном случае флаг event.notified
установлен в значение false
что обуславливает приостановку корутины. Технически, вызывается метод await_suspend
который получает handle корутины ch
и сохраняет его для последующего вызова в переменную corotineHandle
. Последующий вызов, в данном случае, означает возобновление работы. К тому же, waiter
сохраняется в переменной suspendedWaiter
. Когда затем срабатывает уведомление через event2.notify
начинает выполнение соответствующий метод notify
. Различие тут в том, что в условии где проверяется доступен ли waiter
таковой уже не будет nullptr
. В результате waiter
использует coroutineHandle
для возобновления работы корутины.
5oclock
Как говорится, стесняюсь спросить:
Для демонстрации корутин на плюсах действительно нужен такой монструозный код?
Как говорил Янычар из фильма "72 метра":
Тут же… можно сломать, пока до конца доберёшься!
Harrowmont Автор
Конкретно данные примеры рассматривают различные кейсы работы корутин. К более простым примерам в boost asio я уже отсылал в комментах к предыдущей статье, вот и сюда положу. Я потому и начал перевод главы из книги, что немного закипел после разбора этих примеров. Однако последние 4 абзаца, которые поясняют различия, расставили всё на места. По крайней мере для меня.