Асинхронный Rust в трех частях
Часть вторая: Задачи
Часть третья: IO
Async/await, или «асинхронный ввод‑вывод», — это относительно новая функция языка, которая позволяет программам выполнять несколько задач одновременно. Это своего рода альтернатива многопоточности, хотя программы на Rust часто используют и то и другое. Асинхронный ввод‑вывод особенно популярен в веб‑сервисах и сетевых приложениях, работающих с большим числом подключений одновременно.
В данном случае «большое число» обычно означает десять тысяч или более подключений, что иногда называют «проблемой C10K», то есть задачей обработки 10 000 клиентов или соединений. Использование большого количества «futures» или «задач» оказывается более эффективным, чем запуск множества потоков.
Эта серия статей представляет собой введение в «futures», задачи и асинхронный ввод‑вывод в Rust. Наша цель — понять основные механизмы, чтобы асинхронный код не казался магией. Мы начнем с преобразования (так называемой «рассахаризации») асинхронных примеров в обычный Rust и постепенно создадим собственную асинхронную «среду выполнения». На данном этапе под «средой выполнения» мы понимаем библиотеку или фреймворк, которые используются для написания асинхронных программ.
Создание собственных «futures», задач и механизма ввода‑вывода позволит понять, что именно делает для нас среда выполнения. Предполагается, что вы уже немного писали на Rust и читали The Rust Programming Language \или аналогичный источник.
Давайте начнем с выполнения нескольких задач одновременно с использованием потоков. Сначала все пойдет хорошо, но затем, с увеличением количества потоков, начнутся проблемы. Затем мы добьемся того же, используя async/await. В этой части мы разберем примеры, а в следующей мы начнем углубляться в них.
Потоки
Ниже пример функции foo
, которая выполняется за одну секунду:
fn foo(n: u64) {
println!("start {n}");
thread::sleep(Duration::from_secs(1));
println!("end {n}");
}
Если мы хотим одновременно вызвать несколько функций foo
, мы можем создать по отдельному потоку для каждого вызова. Сделать это можно следующим образом:
fn main() {
let mut thread_handles = Vec::new();
for n in 1..=10 {
thread_handles.push(thread::spawn(move || foo(n)));
}
for handle in thread_handles {
handle.join().unwrap();
}
}
Обратите внимание, что метод join
здесь означает «дождаться завершения потока». Потоки начинают выполняться в фоновом режиме, как только мы вызываем spawn
, так что все они отрабатывают, пока мы ждём завершения первого, и остальные вызовы join
возвращаются быстро.
Мы можем увеличить количество потоков до ста, и все будет работать нормально. Но если мы попытаемся запустить тысячу потоков, то у нас возникнут проблемы:
thread 'main' panicked at /rustc/3f5fd8dd41153bc5fdca9427e9e05...
failed to spawn thread: Os { code: 11, kind: WouldBlock, message:
"Resource temporarily unavailable" }
Каждый поток использует много памяти, поэтому существует ограничение на количество потоков, которые мы можем создать. Хотя на Playground это не так заметно, мы также можем вызвать проблемы с производительностью, переключаясь между множеством потоков одновременно. Потоки хорошо подходят для выполнения нескольких задач параллельно, или даже нескольких сотен, но по различным причинам они плохо масштабируются в других ситуациях. Если мы хотим запустить тысячи задач, нам нужно что‑то другое.
Async
Давайте попробуем то же самое с использованием async/await
. Пока мы просто напишем код и запустим его в Playground, не разбирая его. Асинхронная функция foo
выглядит так:
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
Выполнение нескольких вызовов функции foo
по одному выглядит так:
#[tokio::main]
async fn main() {
foo(1).await;
foo(2).await;
foo(3).await;
}
Первое, что отличает асинхронные функции, это то, что мы объявляем их с помощью ключевого слова async
, а при их вызове используем .await
. Логично.
Выполнение нескольких вызовов функции foo
одновременно выглядит так:
#[tokio::main]
async fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
let joined_future = future::join_all(futures);
joined_future.await;
}
Обратите внимание, что мы не используем .await
для каждого вызова foo
в этот раз. Каждый вызов foo
, не помеченный await
, возвращает «future», который мы собираем в Vec
, что‑то вроде Vec
дескрипторов потоков выше. Но join_all
очень отличается от метода join
, который мы использовали с потоками. Ранее join
означало ожидание чего‑то, но здесь это означает как‑то объединить несколько «futures» вместе. Мы подробнее разберем это в следующей части, но сейчас мы можем добавить ещё несколько выводов, чтобы увидеть, что join_all
не занимает времени, и ни один из вызовов foo
не начинается, пока мы не применим .await
к объединенному «future».
В отличие от примера с потоками выше, async/await
работает даже если мы увеличим количество «futures» до тысячи. На самом деле, если мы закомментируем выводы и соберём в релизный билд, мы сможем запустить миллион «futures» одновременно. Именно поэтому асинхронное программирование так популярно.
Важные ошибки
Мы можем получить некоторые подсказки о том, как работает асинхронный код, если начнем делать ошибки. Сначала давайте попробуем использовать thread::sleep
вместо tokio::time::sleep
в нашей асинхронной функции:
async fn foo(n: u64) {
println!("start {n}");
thread::sleep(Duration::from_secs(1)); // Oops!
println!("end {n}");
}
(Playground отрабатывает…)
О нет! Все снова выполняется последовательно. К сожалению, это распространенная ошибка. В следующей части станет ясно, как thread::sleep
нарушает асинхронность. На данный момент мы можем предположить, что функции foo, которые выполняются «одновременно», на самом деле работают все на одном потоке.
Мы также можем попробовать дожидаться завершения каждого «future» в цикле, так же как мы изначально ожидали завершения потоков в цикле:
#[tokio::main]
async fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
for future in futures {
future.await; // Oops!
}
}
Это тоже не работает! Мы видим, что «futures» не выполняют работу «в фоновом режиме». Вместо этого они отрабатывают, когда мы выполняем await. Если мы ждем их по одному, они выполняют свою работу по одному. Но каким‑то образом join_all
позволяет нам выполнить await
для всех одновременно.
Хорошо, у нас здесь много загадок. Давайте начнем их решать.
rukhi7
зашел посмотреть какую же такую сложную задачу решил распараллелить автор. И автор не подвел, выбрал самую сложную задачу
tokio::time::sleep
то есть "спать"!
С такой задачей такие достижения:
поражают воображение.
tbl
Если прочитать оригинал полностью, то будет понятно, что корректно имплементировать "спать" в мире асинхронных функций не так уж и просто: "как не заблокироваться?", "кто будет будить?", "как избежать ненужного поллинга?", "как реализовать отмену?", "как не дать протечь абстракции?", "как по дороге сохранить и не расплескать контекст исполнения?" и прочие тому подобные нюансы