Когда я начал работать с Asio и изучал документацию библиотеки, прочитал мнение, что доку писали «для роботов». Автор имел в виду, что описание каждого концепта, функции или особенности приводится лишь однажды, без перекрестных ссылок и других удобных для разработчика деталей. Документация составлена так, что понять ее может разве что машина, «просканировав» текст целиком.

Я подумал, что было бы здорово написать статью, которая служила бы введением в библиотеку. Статью, которая помогла бы начать пользоваться Asio, даже если раньше вы с ней не работали. Я — старший инженер по разработке ПО для систем хранения данных в YADRO, и Asio помогает мне улучшать работу наших СХД.


Рассказ об Asio я начну с небольшого погружения в доменную область — устройство системы хранения данных. Если сильно упрощать, СХД — это сервер, который общается с дисками памяти на языке протоколов. Протоколы же состоят из команд. Обычный жесткий диск — SATA-диск с ATA-командами. В СХД используют SAS-диски, и общаются они по протоколу SCSI, то есть посылают SCSI-команды.

На иллюстрации выше изображена Read-команда. Диск ее получает и обрабатывает. Если все хорошо, возвращает данные. А если нехорошо, то дальше ситуация может разворачиваться по-разному. Диски выходят из строя абсолютно уникальными способами.

Конечно, хочется, чтобы софт, который мы разрабатываем, вел себя адекватно, обрабатывал каждую ситуацию с предсказуемым результатом. Чтобы этого добиться, нужно научится воспроизводить ошибки в лабораторных условиях и тестировать софт. Для этого мы «встанем» между диском и сервером. Будем получать команды, обрабатывать их и имитировать ошибки.

Имитируем ошибки в диске

Сначала опишем команды, которые принимает диск:

struct Command {
	virtual void execute() = 0;
	virtual ~Command() = default;
	/* ... */
};

Подумаем: а что сделать с командами, чтобы они завершались с ошибками? Для этого обернем оригинальную команду и выставим ошибку, при этом саму команду выполнять даже не будем:

class ErrorCommand : public CommandDelegate {
	void setError();

public:
	void execute() override {
		setError();
		// skip actual command execution
	}
	/* ... */
};

Также мы «испортим» данные. В реальности это не очень приятная история, но сымитировать эту ситуацию нам важно:

class ReadCorruptCommand : public ReadCommandDelegate {
	void corruptData();

public:
	void execute() override {
		// Do actual read
		command_->execute();
		// Corrupt data
		corruptData();
	}
	/* ... */
};

Мы читаем из диска с помощью Read-команды, и после переворачиваем любой бит, чтобы протестировать, обнаружил ли софт «порчу» данных.

Еще одна частая ошибка — когда команды завершаются через достаточно долгое время после отправки в диск. Если реализовать такую ошибку «в лоб», получится что-то такое:

class DelayCommand : public CommandDelegate {
	std::chrono::milliseconds delay_;
public:
	void execute() override {
		std::this_thread::sleep_for(delay_);
		command_->execute();
	}
	/* ... */
};

Думая об имплементации, первое, что приходит в голову, это взять и сказать: «Ну, иди — спи». Но, кажется, это совсем не эффективно с точки зрения ресурсов.

Когда у нас появляется много дисков и команд в очереди, система начинает тратить ресурсы на ненужные операции, переключение контекстов — ее производительность быстро падает. Очевидно, что такое решение не подходит. Стоит подумать в сторону асинхронной обработки. Хорошо, что у нас есть Boost — библиотека, которая уже подключена к проекту. А еще наш компилятор поддерживает стандарт C++20, в котором появились корутины. Воспользуемся ими.

А почему вообще корутины? 

Когда появились корутины, они произвели настоящий фурор — многие сразу захотели их попробовать. Дело в том, что для программиста корутины выглядят как обычные синхронные вызовы функций: код кажется последовательным и понятным, хотя он работает асинхронно.

Сейчас уже много статей и докладов объясняют, как работают корутины в C++ 20, как их реализовать и что происходит под капотом. Но большинство разработчиков, которые пишут бизнес-логику, не хотят тратить время на создание собственного асинхронного движка с корутинным интерфейсом. Поэтому мы взяли готовые решения — корутины из библиотеки Asio.

Внедрить корутины в проект оказалось достаточно просто. Единственное, что нужно изменить в интерфейсе, — возвращаемое значение. В теле функции теперь изменяется возвращаемое значение, и все, что внутри корутины, требует co_await. Корутина, согласно стандарту, — это функция, которая использует одно из трех ключевых слов: co_yieldco_return или co_await. И если корутина ничего не делает, никого не ждет, то нам приходится добавлять co_return.

Если пробежаться по дифу, который появился, когда мы внедрили корутины в проект, увидим, что самое большое изменение — это запуск команды:

15c16,20
< void execute(Command& command) { command.execute(); }
---
> void execute(Command& command) {
> 	auto ctx = io_context{1};
> 	co_spawn(ctx.get_executor(), command.execute(), detached);
> 	ctx.run();
> }

Дело в том, что до этого мы могли сделать функцию, которая вызывает у команды execute. И execute выполняется в том же потоке, в котором мы эту функцию вызвали. Логично. Тут ничего нового не придумали.

Запускаем корутину

Чтобы запустить корутину с Asio, впрочем, придется сделать чуть больше приседаний. Тут главное отметить, что у нас появился co_spawn, который нужен, чтобы запустить эту корутину. Этот co_spawn принимает в себя либо execution_context, либо executor. В примере выше это:

co_spawn(ctx.get_executor(), ...);
/* Or */
co_spawn(ctx, ...);

Command::execute создает новый объект —awaitable, который предоставляет Asio.  Передаем вторым параметром awaitable, и передаем Completion token — действие, которое выполнится после завершения асинхронной операции. В этом примере используется Completion Token detached. Он говорит о том, что нас не интересует, чем закончится асинхронная операция, то есть корутина.

Отлично! Превратили все методы execute в корутины, теперь можно и тесты запускать. Скажу честно: если бы они все были зеленые, я бы не поверил. Этого и не случилось. Дело в том, что у awaitable, который предоставляет Asio, есть конструктор по умолчанию. И этот конструктор создает объект, не ассоциированный ни с какой корутиной. В итоге следующее выражение приведет в лучшем случае к Segmentation Fault:

co_await awaitable<void>{};

Встретить выражение выше достаточно сложно в обычной кодовой базе, но этот пример тут только для наглядности. К примеру, конструирование по умолчанию происходит при использовании библиотеки gmock. Вот так:

EXPECT_CALL(*commandMock, execute()).Times(1);

Мок-метод будет создавать возвращаемый объект с использованием конструктора по умолчанию. Чтобы не наткнуться на это, можно использовать другой матчер для мока:

EXPECT_CALL(*commandMock, execute()).WillOnce([] -> awaitable<void> { co_return; });

Или выставить фабрику для этого типа:

DefaultValue<awaitable<void>>::SetFactory([] -> awaitable<void> {
	co_return;
});

Использование корутин с захватом тоже может быть опасным, подробнее можно почитать здесь.

Что такое executor и context

Первый Segmentation Fault за плечами. Теперь расскажу подробнее про параметры, которые необходимо передать в co_spawnexecutor и execution_context, к сожалению, не так близки к тем executor, которые попадут в 26-й стандарт, если нам повезет. Можно сказать, что в контексте этих экзекьюторов Asio выполняет всю работу. Так, в их контексте выполняется код корутин и код Completion handler'ов. Я бы выделил три основных execution_context, которые идут в Asio из коробки:

  • io_context

  • thread_pool

  • system_context

Теперь о каждом по порядку.

io_context

Мне удобно думать о нем как об event loop. На самом деле это он и есть. Как только вы вызовите io_context::run, он будет крутиться в том потоке, в котором вы его вызвали. Пока в нем есть события (ивенты).

thread_pool

Набор потоков, в которых будут выполняться асинхронные операции. Количество потоков можно задать при создании пула. Чтобы его остановить, надо вызвать stop и затем join.

system_context

Это синглтон. Доступ к нему должен осуществляться через system_executor::get_executor. Создается при первом обращении. Можно сказать, что это глобальный тредпул. Но есть нюанс, к которому мы вернемся позже в статье. Если вы видите асинхронную операцию, которая позволяет не передавать в нее executor, значит, она будет использовать system_executor.

Вот и все, что нужно для того, чтобы запустить корутины в Asio.

А что же там было с DelayCommand?

Мы начали с того, что хотели избавиться от блокирования потока на определенное время. Пользователь может сказать: хочу, чтобы команда не выполнялась пять минут. И у нас в течение пяти минут один поток программы будет стоять, ждать и ничего не делать. С использованием корутин DelayCommand будет выглядеть следующим образом:

awaitable<void> execute() override {
	using namespace boost::asio;

	auto ex = co_await this_coro::executor;
	auto timer = steady_timer{ex, delay_};
	co_await timer.async_wait(use_awaitable);

	co_await command_->execute();
}

Пройдусь по коду построчно :

  1. Получаем executor из контекста корутины. Не стоит переживать, что корутина заснет в этот момент. promise_type для awaitable специализирует await_transform для this_coro::executor и всегда возвращает true. Такой интерфейс предоставляет Asio для работы с контекстом корутины. К примеру, точно так же можно настраивать cancellation для корутины.

  2. Создаем io object steady_timer. Это один из многих io-объектов, которые Asio предоставляет из коробки.

  3. Вызываем асинхронную операцию над таймером и передаем в нее Completion token. Тут появляется новый Completion token —use_awaitable. Он специализирует асинхронную операцию так, что она возвращает новый awaitable, на котором мы можем сделать co_await, чтобы асинхронно дождаться завершения таймера.

Дисклеймер: В статье я использую Completion token и Completion handler как синонимы. Это не совсем так, но для упрощения статьи я сделал такое допущение.

await_transform для promise_type для awaitable на самом деле специализирован для всех типов. Из-за этого корутины в Asio можно назвать «закрытыми». Это значит, что в Asio-корутинах нельзя использовать ничего, кроме примитивов Asio. К примеру, выражение co_await std::suspend_never{} попросту не скомпилируется, потому что для него не найдется ни одной подходящей специализации await_transform. А свою не добавить.

Чтобы внедрить свою асинхронность к Asio, лучше воспользоваться async_compose или asyn_initiate, но это тема для отдельной статьи.

Внимательный читатель может задать вопрос: «А какой тип у ex?» И это отличный вопрос для того, чтобы перейти к следующей теме!

В статье я уже упомянул один executor — system_executor. Кроме него, собственный executor есть и у io_context, и у thread_pool. Их можно получить, вызвав метод get_executor у этих контекстов. Ни у одного из них нет общего базового класса, это обычные классы со схожими методами, соответствующие концепту executor.

Мы, конечно, знаем, что корутины стартовали из io_context. Вот только корутины не бывают шаблонными, и тип executor не передать как шаблонный параметр. А вот возращаемое значение как раз шаблонное, и в него можно передать второй шаблонный параметр, коим и является Executor. То есть, когда мы объявляем метод, он выглядит так:

awaitable<void, any_io_executor> execute() override;

Конечно, мы можем передать в него io_context, но по умолчанию в нем содержится any_io_executor. Что же это такое? any_io_executor — это не настоящий экзекьютор, а обертка. Он предоставляет полиморфный интерфейс для всех экзекьюторов, которые есть в Asio. При его создании происходит type erasure, и информация о конкретном типе стирается.

Если мы все же хотим получить доступ к настоящему экзекьютору, для этого есть шаблонный метод target, в который мы можем передать интересующий нас тип экзекьютора. Если угадали, вернет сырой указатель на этот экзекьютор, если нет — вернет nullptr:

EXPECT_EQ(thisCoroExecutor.target<system_executor>(), nullptr);
EXPECT_NE(thisCoroExecutor.target<io_context::executor_type>(), nullptr);

Можно сказать, что вся работа с Asio сводится к композиции асинхронных операций над асинхронными объектами. Теперь еще и корутинный интерфейс завезли. Чтобы определить, в каком контексте будут выполняться асинхронные операции, используются экзекьюторы. Они либо передаются на старте асинхронных операций, либо композируются с io-объектами.

Зачем нужен полиморфный executor

На самом деле полиморфный executor можно использовать не только ради удобного интерфейса. С помощью него можно писать алгоритмы, абстрагированные от среды выполнения. Опять же, давайте посмотрим на примере:

class Worker {
	virtual std::shared_ptr<Command> getNextCommand() = 0;

	/* ... */
public:
	void work(any_io_executor ex, std::stop_token token) {
		while (!token.stop_requested()) {
			auto command = getNextCommand();
			auto a = command->execute();
			co_spawn(ex, std::move(a), detached);
		}
	}
};

Пока не попросят остановиться, мы ждем следующую команду, создаем awaitable и запускаем новую корутину. Передавая в метод work, мы можем эффективно использовать ресурсы. К примеру, мы можем запускать какие-то воркеры на thread_pool::executor'ах, а другие — на особо выделенных io_context::executor'ах. И таким образом «играться» с производительностью.

Теперь возникает ситуация: до внедрения корутин команды выполнялись друг за другом. Нас это устраивало до тех пор, пока не появилось требование ждать. Теперь же, если передать в алгоритм выше thread_pool, команды начнут выполняться параллельно. Это потенциально может привести к гонкам и прочим неожиданным вещам. Вот схематичное изображение выполнения команд:

Если мы передадим в метод work io_context и запустим run только в одном потоке, то все будет работать. Не хочется писать алгоритм, который будет нестабильно работать, если передать ему неправильный параметр (с учетом, что тип параметра этого позволяет). Как же быть?

Для начала визуализирую то, к чему хотелось бы прийти:

Что происходит на изображении? Появляется команда к обработке. Worker берет эту команду и запускает — сразу же появляется следующая команда. И вместо того, чтобы начать ее выполнение параллельно, она кладется в очередь к выполнению. Как только первая команда сообщила, что хочет отдать управление, внутреннее устройство Asio достает из очереди вторую команду и запускает ее. В итоге в один момент времени может выполнятся только одна команда, начатая одним и тем же воркером. Однако если этот же thread_pool используется другим воркером, то команды, начатые разными воркерами, спокойно могут выполняться параллельно.

Чтобы достичь такого результата, Asio предоставляет функцию make_strand. Эта функция создает еще одну executor-обертку. Но теперь задача этой обертки — сериализовать выполнение асинхронных операций. Так будет выглядеть обновленный метод work:

void work(any_io_executor ex, std::stop_token token) {
	auto samPorterBridges = make_strand(ex);
	while (!token.stop_requested()) {
		auto command = getNextCommand();
		auto a = command->execute();
		co_spawn(samPorterBridges, std::move(a), detached);
	}
}

Что еще можно делать с executor в Asio

Вспомним DelayCommand и представим ситуацию, в которой все, что мы сделили методом execute, просто привело его к корутинному интерфейсу:

awaitable<void> execute() override {
	std::this_thread::sleep_for(delay_);
	co_await command_->execute();
}

Также допустим, что мы еще не добавили make_strand в прошлом примере. То есть все команды начинают выполняться сразу, как только попали в Worker:

void work(any_io_executor ex, std::stop_token token) {
	while (!token.stop_requested()) {
		auto command = getNextCommand();
		auto a = command->execute();
		co_spawn(ex, std::move(a), detached);
	}
}

Пример синтетический, но достаточно простой для демонстрации. С такой реализацией метода execute можно попробовать оценить, сколько времени займет выполнение трех таких команд в Worker:

worker.work(pool.get_executor(), std::move(stopToken));
//
worker.work(system_executor{}, std::move(stopToken));

Допустим, что stopToken останавливает Worker, как только три команды выполнятся.

Учитывая созданное окружение, кажется, что все три команды должны начать выполнение параллельно. И если каждая команда выполняет задержку в 10 мс, они должны все завершиться примерно за 10 мс. Однако на деле так может и не произойти. Когда я проводил тесты на своем компьютере, work, в который я передал poo.get_executor, выполнился, как и ожидалось, за 10 мс. А work, в который я передал system_executor, выполнился за 30 мс!

Как такое возможно? 30 мс — это три раза по 10 мс, поэтому можно предположить, что команды начали выполняться последовательно. Так и произошло! Если распечатать thread id из команд, он будет совпадать с thread id, в котором был запущен метод work. Это говорит о том, что команды были выполнены в одном потоке друг за другом. Единственное место, где это могло произойти, — в вызове co_spawn.

Как этого избежать или хотя бы узнать, что это прозойдет? Тут на сцену выходят свойства executor — executor properties. Чтобы узнать, попытается ли Asio выполнить асинхронную задачу в текущем потоке, можно воспользоваться функцией query и свойством execution::blocking. Последнее как раз и говорит о том, заблокируется ли текущий вызов асинхронной задачи или нет.

query(system_executor{}, execution::blocking) == execution::blocking.possibly;

Кроме blocking.possibly, есть еще blocking.always и blocking.neverblocking.possibly говорит о том, что вызов асинхронной операции может начать выполнение этой операции сразу в текущем потоке. А может и не начать. blocking.never говорит о том, что задача точно не заблокирует текущий поток, а blocking.always, наоборот, сообщает, что асинхронная операция обязательно заблокирует текущий поток.

Помимоexecution::blocking, у executor есть и другие свойства. Например:

  • execution::allocator, которая позволяет получить аллокатор ассоциированный с экзекьютором,

  • execution::context , с помощью которой можно достать execution_context ,

  • execution::occupancy — количество потоков, доступное этому экзекьютору.

Как узнать о свойствах executor, я рассказал. Теперь о том, как их поменять. Для этого Asio предоставляет функцию prefer и requireprefer позволяет игнорировать вашу просьбу, в то время как require строго задает указанное свойство. В методе work это будет выглядеть так:

void work(any_io_executor ex, std::stop_token token) {
	auto neverBlocks = require(ex, execution::blocking.never);
	while (!token.stop_requested()) {
		auto command = getNextCommand();
		auto a = command->execute();
		co_spawn(neverBlocks, std::move(a), detached);
	}
}

Так мы привели метод work к более ожидаемому поведению.

Суммируя работу над методом work, получаем следующее:

void work(any_io_executor ex, std::stop_token token) {
	auto neverBlocks = require(ex, execution::blocking.never);
	auto samPorterBridges = make_strand(neverBlocks);
	while (!token.stop_requested()) {
		auto command = getNextCommand();
		auto a = command->execute();
		co_spawn(samPorterBridges, std::move(a), detached);
	}
}

Прекращаем игнорировать результат асинхронных операций

Когда приносишь изменения, важно их протестировать. Примеры, приведенные в статье, не исключение. Приведу выдержку из теста:

auto command1 = std::make_shared<DelayCommand>(commandMock, command1Delay);
EXPECT_CALL(worker, getNextCommand())
	.WillOnce(Return(command1))

Не долго думая, я как C++ программист с пониманием, что дальше по коду command1 использоваться не будет, решил добавить std::move в ожидании мока :

auto command1 = std::make_shared<DelayCommand>(commandMock, command1Delay);
EXPECT_CALL(worker, getNextCommand())
	.WillOnce(Return(std::move(command1)))

За это я получил Segmentation Fault. Что пошло не так?

Если проследить за временем жизни команды, можно увидеть, что ее объект разрушается до того, как завершилось выполнение метода execute. Какой же чудесный корутинный мир перед нами открывается! Попробуем изобразить все это через схему:

Команда попала в тело цикла метода work. Мы создали awaitable, который отправили в co_spawn. Даже если команда начала выполняться в том же потоке, она доходит до старта таймера, где возвращает поток управления вызывающей функции. Цикл осуществляет новую итерацию, и последняя ссылка на объект команды сбрасывается. Объект освобождается, и, когда Asio возобновляет корутину и начинает обращаться к полям освобожденного объекта (в данном случае — полю command_), она обращается к уже освобожденной памяти.

Проблему описал, теперь — к решению. Очевидно, что нам нужно сохранить ссылку на объект команды, но где? Asio подумал и об этом, предоставив адаптеры для Completion token. К примеру, consign. Все, что он делает, — создает структуру, в которой поля — это значения, переданные в него, и оригинальный Completion token. В очередной раз дополняем код метода work:

void work(any_io_executor ex, std::stop_token token) {
	auto neverBlocks = require(ex, execution::blocking.never);
	auto samPorterBridges = make_strand(neverBlocks);
	while (!token.stop_requested()) {
		auto command = getNextCommand();
		auto a = command->execute();
		auto token = consign(detached, command);
		co_spawn(samPorterBridges, std::move(a), std::move(token));
	}
}

С этой проблемой разобрались, едем дальше. Что если команда выбросит исключение? Как образом мы об этом узнаем?

EXPECT_CALL(*commandMock, execute())
	.WillOnce([] -> awaitable<void> {
		throw std::runtime_error{"Command1 error"};
		co_return;
	})

Пока что никаким, потому что все это время мы передавали Completion token detached, когда начинали команду. Пора это изменить. Что мы можем туда передать? Обычная функция, которая принимает как аргументы Completion signature асинхронной операции, тоже может являться Completion token.

На этом примере можно попытаться объяснить разница между Compltion token и Completion handler. В случае с функцией Completion token и Completion handler это одно и то же — сама функция.

Однако в случае, к примеру, с detached все немного иначе. Completion- токеном является глобальная переменная detached, которая имеет тип detached_t . А вот Completion handler — это внутренний объект Asio, который конструируется, когда в инициирующую функцию передается Completion token detached.

То есть мы можем сделать следующее:

co_spawn(ex, std::move(a), [] (std::exception_ptr eptr) { /* handle eptr */ )});

В таком случае мы снова теряем референс на команду. Однако ее можно положить в захват.

co_spawn(ex, std::move(a), [command = std::move(command)] (std::exception_ptr eptr) { /* handle eptr */ )});

А если не нравятся лямбды с захватом, можно использовать append или prepend. Таким образом команда будет ассоциирована с Completion token и передастся как дополнительный параметр:

co_spawn(ex, std::move(a), append([] (std::exception_ptr eptr, auto cmd) { /* handle eptr */ )}, command));

Заключение

Главное, что я вынес из своего опыта внедрения корутин в проекты, — важно внимательно следить за временем жизни объекта, если сделал один из его методов корутиной.

Asio — это просто композиция асинхронных операций, которые выполняются в контексте экзекьюторов. Поведение последних можно менять. Интерфейс асинхронных операций можно менять с помощью Completion-токенов. Подробнее про них я писал в предыдущей статье.

Если у вас есть вопросы по работе с библиотекой Asio, задавайте их в комментариях. С радостью отвечу!

Комментарии (2)


  1. Kelbon
    26.08.2025 16:04

    Чтобы внедрить свою асинхронность к Asio, лучше

    написать свои awaiters над нормальными калбеками asio, забыв что там существуют корутины


  1. Jijiki
    26.08.2025 16:04

    есть библиотека aio - aio.h тоже интересная, ... epoll, да селект тоже интересно. просто вы не пишите какая ОС у вас

    команды запускаются от запросов как я понимаю, на сервере каждая команда имеет своё окружение(на чтение запись), таким образом команды(дексрипторы или номера процессов) в пуле команд от этого задача становится только интереснее, зачем тащить буст