Асинхронный Rust в трех частях

Async/await, или «асинхронный ввод‑вывод», — это относительно новая функция языка, которая позволяет программам выполнять несколько задач одновременно. Это своего рода альтернатива многопоточности, хотя программы на Rust часто используют и то и другое. Асинхронный ввод‑вывод особенно популярен в веб‑сервисах и сетевых приложениях, работающих с большим числом подключений одновременно.

В данном случае «большое число» обычно означает десять тысяч или более подключений, что иногда называют «проблемой C10K», то есть задачей обработки 10 000 клиентов или соединений. Использование большого количества «futures» или «задач» оказывается более эффективным, чем запуск множества потоков.

Эта серия статей представляет собой введение в «futures», задачи и асинхронный ввод‑вывод в Rust. Наша цель — понять основные механизмы, чтобы асинхронный код не казался магией. Мы начнем с преобразования (так называемой «рассахаризации») асинхронных примеров в обычный Rust и постепенно создадим собственную асинхронную «среду выполнения». На данном этапе под «средой выполнения» мы понимаем библиотеку или фреймворк, которые используются для написания асинхронных программ.

Создание собственных «futures», задач и механизма ввода‑вывода позволит понять, что именно делает для нас среда выполнения. Предполагается, что вы уже немного писали на Rust и читали The Rust Programming Language \или аналогичный источник.

Давайте начнем с выполнения нескольких задач одновременно с использованием потоков. Сначала все пойдет хорошо, но затем, с увеличением количества потоков, начнутся проблемы. Затем мы добьемся того же, используя async/await. В этой части мы разберем примеры, а в следующей мы начнем углубляться в них.

Потоки

Ниже пример функции foo, которая выполняется за одну секунду:

fn foo(n: u64) {
    println!("start {n}");
    thread::sleep(Duration::from_secs(1));
    println!("end {n}");
}

Playground #1

Если мы хотим одновременно вызвать несколько функций foo, мы можем создать по отдельному потоку для каждого вызова. Сделать это можно следующим образом:

fn main() {
    let mut thread_handles = Vec::new();
    for n in 1..=10 {
        thread_handles.push(thread::spawn(move || foo(n)));
    }
    for handle in thread_handles {
        handle.join().unwrap();
    }
}

Playground #1

Обратите внимание, что метод join здесь означает «дождаться завершения потока». Потоки начинают выполняться в фоновом режиме, как только мы вызываем spawn, так что все они отрабатывают, пока мы ждём завершения первого, и остальные вызовы join возвращаются быстро.

Мы можем увеличить количество потоков до ста, и все будет работать нормально. Но если мы попытаемся запустить тысячу потоков, то у нас возникнут проблемы:

thread 'main' panicked at /rustc/3f5fd8dd41153bc5fdca9427e9e05...
failed to spawn thread: Os { code: 11, kind: WouldBlock, message:
"Resource temporarily unavailable" }

Playground #2

Каждый поток использует много памяти, поэтому существует ограничение на количество потоков, которые мы можем создать. Хотя на Playground это не так заметно, мы также можем вызвать проблемы с производительностью, переключаясь между множеством потоков одновременно. Потоки хорошо подходят для выполнения нескольких задач параллельно, или даже нескольких сотен, но по различным причинам они плохо масштабируются в других ситуациях. Если мы хотим запустить тысячи задач, нам нужно что‑то другое.

Async

Давайте попробуем то же самое с использованием async/await. Пока мы просто напишем код и запустим его в Playground, не разбирая его. Асинхронная функция foo выглядит так:​

async fn foo(n: u64) {
    println!("start {n}");
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("end {n}");
}

Playground #3

Выполнение нескольких вызовов функции foo по одному выглядит так:​

#[tokio::main]
async fn main() {
    foo(1).await;
    foo(2).await;
    foo(3).await;
}

Playground #3

Первое, что отличает асинхронные функции, это то, что мы объявляем их с помощью ключевого слова async, а при их вызове используем .await. Логично.

Выполнение нескольких вызовов функции foo одновременно выглядит так:​

#[tokio::main]
async fn main() {
    let mut futures = Vec::new();
    for n in 1..=10 {
        futures.push(foo(n));
    }
    let joined_future = future::join_all(futures);
    joined_future.await;
}

Playground #4

Обратите внимание, что мы не используем .await для каждого вызова foo в этот раз. Каждый вызов foo, не помеченный await, возвращает «future», который мы собираем в Vec, что‑то вроде Vec дескрипторов потоков выше. Но join_all очень отличается от метода join, который мы использовали с потоками. Ранее join означало ожидание чего‑то, но здесь это означает как‑то объединить несколько «futures» вместе. Мы подробнее разберем это в следующей части, но сейчас мы можем добавить ещё несколько выводов, чтобы увидеть, что join_all не занимает времени, и ни один из вызовов foo не начинается, пока мы не применим .await к объединенному «future».

В отличие от примера с потоками выше, async/await работает даже если мы увеличим количество «futures» до тысячи. На самом деле, если мы закомментируем выводы и соберём в релизный билд, мы сможем запустить миллион «futures» одновременно. Именно поэтому асинхронное программирование так популярно.

Важные ошибки

Мы можем получить некоторые подсказки о том, как работает асинхронный код, если начнем делать ошибки. Сначала давайте попробуем использовать thread::sleep вместо tokio::time::sleep в нашей асинхронной функции:

async fn foo(n: u64) {
    println!("start {n}");
    thread::sleep(Duration::from_secs(1)); // Oops!
    println!("end {n}");
}

Playground #5

(Playground отрабатывает…)

О нет! Все снова выполняется последовательно. К сожалению, это распространенная ошибка. В следующей части станет ясно, как thread::sleep нарушает асинхронность. На данный момент мы можем предположить, что функции foo, которые выполняются «одновременно», на самом деле работают все на одном потоке.

Мы также можем попробовать дожидаться завершения каждого «future» в цикле, так же как мы изначально ожидали завершения потоков в цикле:

#[tokio::main]
async fn main() {
    let mut futures = Vec::new();
    for n in 1..=10 {
        futures.push(foo(n));
    }
    for future in futures {
        future.await; // Oops!
    }
}

Playground #6

Это тоже не работает! Мы видим, что «futures» не выполняют работу «в фоновом режиме». Вместо этого они отрабатывают, когда мы выполняем await. Если мы ждем их по одному, они выполняют свою работу по одному. Но каким‑то образом join_all позволяет нам выполнить await для всех одновременно.

Хорошо, у нас здесь много загадок. Давайте начнем их решать.

Комментарии (2)


  1. rukhi7
    25.10.2024 12:17

    зашел посмотреть какую же такую сложную задачу решил распараллелить автор. И автор не подвел, выбрал самую сложную задачу

    tokio::time::sleep

    то есть "спать"!

    С такой задачей такие достижения:

    Мы можем увеличить количество потоков до ста, и все будет работать нормально.

    поражают воображение.


    1. tbl
      25.10.2024 12:17

      Если прочитать оригинал полностью, то будет понятно, что корректно имплементировать "спать" в мире асинхронных функций не так уж и просто: "как не заблокироваться?", "кто будет будить?", "как избежать ненужного поллинга?", "как реализовать отмену?", "как не дать протечь абстракции?", "как по дороге сохранить и не расплескать контекст исполнения?" и прочие тому подобные нюансы