Асинхронный Rust в трех частях

Во введении мы посмотрели на пример асинхронного Rust без какого‑либо объяснения, как он работает. Это дало нам несколько вопросов для размышления: Что такое асинхронные функции и возвращаемые ими «future»? Что делает join_all? Чем отличается tokio::time::sleep от std::thread::sleep?

Чтобы ответить на эти вопросы нам потребуется преобразовать каждую из частей в обычный не асинхронный код Rust. Вскоре мы обнаружим, что воспроизвести foo и join_all достаточно просто, а вот со sleep ситуация чуть сложнее. Начнем же.

Foo

Напомню, что асинхронно функция foo выглядела вот так:

async fn foo(n: u64) {
    println!("start {n}");
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("end {n}");
}

Playground #1

А вот так выглядит foo в формате обычной, не асинхронной функции. Ниже будет код всей функции, после чего мы разобьем ее на части и разберем каждую из них. Это полная замена прошлой функции, функцию main изменять не требуется. Вы можете выполнить ее, перейдя в Playground.

fn foo(n: u64) -> Foo {
    let started = false;
    let duration = Duration::from_secs(1);
    let sleep = Box::pin(tokio::time::sleep(duration));
    Foo { n, started, sleep }
}

struct Foo {
    n: u64,
    started: bool,
    sleep: Pin<Box<tokio::time::Sleep>>,
}

impl Future for Foo {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
        if !self.started {
            println!("start {}", self.n);
            self.started = true;
        }
        if self.sleep.as_mut().poll(context).is_pending() {
            return Poll::Pending;
        }
        println!("end {}", self.n);
        Poll::Ready(())
    }
}

Playground #2

Пойдем сверху вниз. Функция foo — обычная функция и возвращает структуру Foo. Она вызывает tokio::time::sleep, но не применяет .await к future Sleep, возвращаемому sleep. Вместо этого future сохраняется в структуре Foo. Мы поговорим о Box::pin и Pin<Box<_>> чуть дальше.

Foo становится future благодаря реализации трейта Future. Ниже код трейта из стандартной библиотеки:

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

Сам трейт предствляет собой пару строк кода, но содержит 3 новых типа данных: Pin, Context и Poll. Мы сфокусируемся на Poll, так что сначала пару слов о Context и Pin, после чего оставим их на потом.

Каждый вызов Future::poll получает Context из вызывающей функции. Когда одна функция poll вызывает другую, например когда Foo::poll вызывает Sleep::poll, она передает ей Context. На данный момент это все, что нам нужно знать, пока мы не доберемся до раздела Wake ниже.

Pin это обертка, используемая для оборачивания указателей. Пока что, если позволите, я сделаю вид что Pin ничего не делает. Я сделаю вид, что Box::pin это просто Box::new, Pin<Box<T>> это Box<T>, Pin<&mut T> это &mut T, а Pin<Box<T>>::as_mut это просто Box::as_mut. Pin на самом деле решает крайне важную задачу в асинхронном Rust, но она будет более понятна после того, как мы попрактикуемся в написании future. Мы вернемся к этому в разделе Pin.

Итак, сфокусируемся на Poll. Это enum и выглядит он вот так:

pub enum Poll<T> {
    Ready(T),
    Pending,
}

Первая задача функции poll - возвращать либо Poll::Ready, либо Poll::Pending. Возвращение Ready значит, что future закончил работу и включает значение Output, если оно есть. В таком случае poll не будет вызываться повторно. Возвращение Pending означает, что future на завершил работу и poll будет вызвана повторно.

У вас мог возникнуть вопрос: а когда она будет вызвана повторно? Если коротко - нужно готовиться к любой ситуации. Функция poll может вызываться раз за разом в “нагруженном цикле” и нам нужно, чтобы она работала корректно. Длинный ответ на этот вопрос будет в разделе Wake.

Давайте взглянем на реализацию трейта Future у Foo и функцию poll. Код:

impl Future for Foo {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
        if !self.started {
            println!("start {}", self.n);
            self.started = true;
        }
        if self.sleep.as_mut().poll(context).is_pending() {
            return Poll::Pending;
        }
        println!("end {}", self.n);
        Poll::Ready(())
    }
}

Playground #2

Мы видели, что первой задачей poll было возвращение Ready или Pending и теперь можем наблюдать, что у poll есть вторая задача, а именно сама задача future. Задача Foo — вывод нескольких сообщений на экран и уход в сон, что и происходит в функции poll.

Тут есть важный компромисс: poll должна выполнять все, что можно сделать быстро, но не должна заставлять вызывающую функцию ждать ответа. Она должна сразу возвращать либо Ready, либо Pending. Этот компромисс и позволяет обрабатывать больше одного future за раз. Это дает им работать, не блокируя друг друга.

Чтобы следовать этому правилу, Foo::poll приходится полагаться на то, что Sleep::poll быстро вернет ответ. Если мы добавим отслеживание времени и вывод, то сможем отследить происходящее. В «важной ошибке», которую мы совершили во введении, thread::sleep нарушила это правило, что привело к последовательному выполнению наших future. Если мы сделаем то же самое в Foo::poll, мы увидим тот же результат. Использование блокирующего сна в poll заставляет вызывающую функцию ждать ответа, блокируя все остальные future.

Foo использует флаг started, чтобы выводить сообщение о начале лишь раз вне зависимости от того, сколько раз вызывалась функция poll. Флаг ended в то же время не требуется, поскольку poll не будет вызвана повторно после возвращения Ready. Флаг started превращает Foo в машину состояния с двумя возможными состояниями. В целом асинхронная функция требует какого‑то начального состояния, а также еще по состоянию на каждую из точек .await, чтобы функция poll могла определить, с какой точки «продолжить выполнение». Если бы у нас было больше двух состояний, мы могли бы использовать enum вместо bool. При написании async fn компилятор делает это за нас и подобное удобство это основная причина наличия async/await как фичи языка.

Join

Теперь, когда мы разобрались с тем, как реализовать простой future, взглянем на join_all. Может сложиться впечатление, что join_all за кулисами использует какую‑то магию по сравнению с foo, но на самом деле оказывается, что у нас есть все нужное для ее реализации. Вот join_all в качестве обычной не асинхронной функции:

fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
    JoinAll {
        futures: futures.into_iter().map(Box::pin).collect(),
    }
}

struct JoinAll<F> {
    futures: Vec<Pin<Box<F>>>,
}

impl<F: Future> Future for JoinAll<F> {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
        let is_pending = |future: &mut Pin<Box<F>>| {
            future.as_mut().poll(context).is_pending()
        };
        self.futures.retain_mut(is_pending);
        if self.futures.is_empty() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

Playground #3

Опять же, наша не асинхронная join_all возвращает структуру, реализующую трейт Future, где работа функции происходит в Future::poll. Здесь опять есть Box::pin, но давайте продолжим ее игнорировать.

Внутри poll всю сложную работу делает Vec::retain_mut. Это обычный метод Vec, который принимает замыкание в качестве аргумента, вызывает его для каждого элемента и убирает элементы, возвращающие false. Это убирает future, вернувшие Ready, следуя правилу «не вызывать их повторно».

Здесь нет ничего нового. Со стороны запуск всех future одновременно выглядел как магия, но по сути, все что происходит внутри это вызов poll для всех элементов Vec. Это обратная сторона компромисса, упомянутого ранее. Если мы можем доверять, что каждый вызов poll быстро вернет ответ, мы можем обрабатывать много future одним циклом.

Обратите внимание, что здесь есть шорткат — мы игнорируем возвращаемое значение вызванных future. В данном случае это работает, потому что мы используем join_all только с foo, не возвращающей чего‑либо. Настоящая join_all возвращает Vec<F::Output>, что требует больше действий для отслеживания состояния. Оставим это в качестве упражнения для читателя, как говорится

Sleep

Мы на верном пути! Кажется, у нас уже есть все, чтобы реализовать свой собственный sleep:

fn sleep(duration: Duration) -> Sleep {
    let wake_time = Instant::now() + duration;
    Sleep { wake_time }
}

struct Sleep {
    wake_time: Instant,
}

impl Future for Sleep {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> {
        if Instant::now() >= self.wake_time {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

Playground #4

Хм. Код компилируется без ошибок, и логика в функции poll выглядит верно, но при запуске он выводит сообщение «start» и затем зависает. Если добавить дополнительные операторы вывода, можно увидеть, что каждый Sleep опрашивается один раз в начале и больше никогда. Что мы упускаем?

Оказывается, у poll три задачи, и пока мы разобрались только с двумя. Во‑первых, poll выполняет всю возможную работу, не блокируясь. Затем, poll возвращает Ready, если он завершен, или Pending, если нет. Но, наконец, каждый раз, когда poll собирается вернуть Pending, ему нужно «запланировать пробуждение». Ах, вот что мы забыли.

Причина, по которой мы раньше не сталкивались с этим, заключается в том, что Foo и JoinAll возвращают Pending только тогда, когда другой «future» уже вернул им Pending, что означает, что пробуждение уже запланировано. Но Sleep — это то, что мы называем «leaf future». У него нет других future ниже по иерархии, и ему нужно пробудить себя.

Wake

Пора более внимательно рассмотреть Context. Если мы вызовем context.waker(), он вернет Waker. Вызывая любой из методов waker.wake() или waker.wake_by_ref() функция может попросить опросить себя повторно. Эти два метода делают одно и тоже, и мы будем использовать тот, который удобнее.

Самое простое, что мы можем попробовать, это запрашивать повторный опрос при возврате Pending каждый раз, когда возвращаем Pending:

fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
    if Instant::now() >= self.wake_time {
        Poll::Ready(())
    } else {
        context.waker().wake_by_ref();
        Poll::Pending
    }
}

Playground #5

Этот код выводит правильный результат и завершается в нужное время, поэтому проблема с «бесконечным сном» решена, но мы заменили ее на проблему «занятого цикла». Этот код вызывает poll снова и снова так быстро, как может, прожигая при этом 100% CPU пока не завершится. Мы можем увидеть это косвенно, подсчитывая количество вызовов poll или можем измерить напрямую с помощью инструментов, например perf в Linux.

Нам необходимо вызывать Waker позже, когда наступит время проснуться, но мы не можем использовать thread::sleep в poll. Одно из возможных решений — запустить другой поток, который будет выполнять thread::sleep за нас и затем вызывать wake. Если бы мы делали это в каждом вызове poll, мы столкнулись бы с проблемой слишком большого количества потоков, о которой говорилось во введении. Но мы могли бы обойти это, запустив общий поток и используя канал для передачи Wakers в него. Это действительно жизнеспособная реализация, но в ней кое‑что не так. Главный поток нашей программы уже проводит большую часть времени в состоянии сна. Почему нам нужно два спящих потока? Почему нет способа пробудить наш главный поток в определенное время?

Что ж, справедливости ради, такой способ есть — именно для этого и существует tokio::time::sleep. Но если мы действительно хотим написать свой собственный sleep и не хотим создавать дополнительный поток для его него, то нам нужно также написать свой собственный main.

Main

Для вызова poll из main нам понадобится Context для передачи в него. Мы можем создать его с помощью Context::from_waker, но для этого нам нужен Waker. Существует несколько способов создать его, но пока нам нужна лишь заглушка, поэтому мы используем вспомогательную функцию под названием noop_waker («Noop», «no‑op» и «nop» — это сокращения от «no operation»). Как только мы создадим Context, мы сможем вызывать poll в цикле:

fn main() {
    let mut futures = Vec::new();
    for n in 1..=10 {
        futures.push(foo(n));
    }
    let mut joined_future = Box::pin(future::join_all(futures));
    let waker = futures::task::noop_waker();
    let mut context = Context::from_waker(&waker);
    while joined_future.as_mut().poll(&mut context).is_pending() {
        // Busy loop!
    }
}

Playground #6

Работает! Хотя у нас все еще остается проблема «занятого цикла», как и выше. Но прежде чем решать ее, нам нужно совершить еще одну важную ошибку:

Поскольку эта версия нашего главного цикла никогда не прекращает опрос, и поскольку наш Waker ничего не делает, может возникнуть вопрос: нужно ли вообще вызывать Waker в Sleep::poll? Удивительно, но это действительно необходимо. Если удалить эту строку, сначала код будет работать нормально. Но если увеличить количество заданий с десяти до ста, наши «future» никогда не «проснутся». Мы видим, что, хотя наш Waker ничего не делает, в программе есть другие Waker. Когда futures::future::JoinAll имеет много дочерних «future» (в версии futures v0.3.30 точный порог равен 31), он создает свои собственные Waker для того, чтобы избегать повторного опроса дочерних элементов, которые не запросили «пробуждения». Это более эффективно, чем каждый раз опрашивать их все, но также означает, что дочерние элементы, которые никогда не вызывают свой собственный Waker, больше не будут опрашиваться. Это и является причиной, почему «future», находящийся в состоянии Pending, всегда должен организовать вызов своего Waker.

Хорошо, возвращаемся к main. Исправим проблему «занятого цикла». Нам нужно, чтобы main использовал thread::sleep до следующего времени пробуждения, что означает, что нам нужен способ, чтобы Sleep::poll передавал Waker и время пробуждения в main. Мы воспользуемся глобальной переменной и обернем ее в Mutex, чтобы безопасный код мог ее модифицировать.

static WAKE_TIMES: Mutex<BTreeMap<Instant, Vec<Waker>>> =
    Mutex::new(BTreeMap::new());

Playground #7

Это отсортированный map от времени пробуждения до Waker. Обратите внимание, что тип значения здесь — Vec<Waker>, а не просто Waker, потому что для данного времени Instant может быть несколько Waker. Это маловероятно на Linux и macOS, где разрешение Instant::now() измеряется в наносекундах, но на Windows разрешение составляет 15,6 миллисекунд.

Sleep::poll может вставить свой Waker в этот map, используя BTreeMap::entry:

fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
    if Instant::now() >= self.wake_time {
        Poll::Ready(())
    } else {
        let mut wake_times = WAKE_TIMES.lock().unwrap();
        let wakers_vec = wake_times.entry(self.wake_time).or_default();
        wakers_vec.push(context.waker().clone());
        Poll::Pending
    }
}

Playground #7

После опроса наш главный цикл может считать первый ключ из map, чтобы получить ближайшее время пробуждения. Затем он может выполнить thread::sleep до этого времени, устраняя проблему «занятого цикла». Далее он вызывает все Waker, время пробуждения которых наступило, прежде чем повторно запустить цикл и опросить снова:

fn main() {
    let mut futures = Vec::new();
    for n in 1..=10 {
        futures.push(foo(n));
    }
    let mut joined_future = Box::pin(future::join_all(futures));
    let waker = futures::task::noop_waker();
    let mut context = Context::from_waker(&waker);
    while joined_future.as_mut().poll(&mut context).is_pending() {
        // The joined future is Pending. Sleep until the next wake time.
        let mut wake_times = WAKE_TIMES.lock().unwrap();
        let next_wake = wake_times.keys().next().expect("sleep forever?");
        thread::sleep(next_wake.saturating_duration_since(Instant::now()));
        // We just woke up. Invoke all the Wakers whose time has come.
        while let Some(entry) = wake_times.first_entry() {
            if *entry.key() <= Instant::now() {
                entry.remove().into_iter().for_each(Waker::wake);
            } else {
                break;
            }
        }
        // Loop and poll again.
    }
}

Playground #7

Работает! Мы решили проблему «занятого цикла» и не понадобилось создавать дополнительные потоки. Это то, что нужно, чтобы написать собственный sleep.

Это своего рода завершение первой части. Во второй части мы расширим наш главный цикл для реализации «задач». Теперь, когда мы понимаем, как работают «future», у нас есть возможность рассмотреть несколько дополнительных тем, которые пригодятся для следующих частей, хотя они не являются обязательными для их понимания.

Бонус: Pin

Теперь, когда мы знаем, как преобразовать async fn в структуру Future, мы можем немного больше рассказать о Pin и о проблеме, которую он решает. Представим, что наша async fn foo по какой-то причине использует ссылку внутри:

async fn foo(n: u64) {
    let n_ref = &n;
    println!("start {n_ref}");
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("end {n_ref}");
}

Playground #8

Этот код компилируется и выполняется нормально, и выглядит как вполне обычный код на Rust. Но как бы выглядело то же самое изменение в нашей структуре future Foo?

struct Foo {
    n: u64,
    n_ref: &u64,
    started: bool,
    sleep: Pin<Box<tokio::time::Sleep>>,
}

Playground #9

Код не компилируется:

error[E0106]: missing lifetime specifier
 --> src/main.rs:3:12
  |
3 |     n_ref: &u64,
  |            ^ expected named lifetime parameter

Каким должно быть время жизни n_ref? Короткий ответ: хорошего ответа нет. Само‑ссылающиеся заимствования в целом не разрешены в структурах Rust, и для того, что пытается сделать n_ref, нет синтаксиса. Если бы он был, то нам пришлось бы ответить на несколько сложных вопросов о том, когда мы можем изменять n и когда можем перемещать Foo.

Но тогда, какой тип Future сгенерировал компилятор для async fn foo выше? Оказывается, Rust выполняет некоторые небезопасные вещи внутри, чтобы устранить невыраженные лайфтаймы, как в случае с n_ref. Задача обертки‑указателя Pin состоит в том, чтобы инкапсулировать эту небезопасность, чтобы мы могли писать пользовательские futures, такие как JoinAll, в безопасном коде. Структура Pin работает с авто‑трейтом Unpin, который реализован для большинства конкретных типов, но не для сгенерированных компилятором futures, возвращаемых async функциями. Операции, которые могут позволить нам перемещать зафиксированные объекты либо ограничены Unpin (как DerefMut), либо помечены как unsafe (например get_unchecked_mut). Оказывается, наше широкое использование Box::pin в примерах выше означало, что все наши futures автоматически были Unpin, так что DerefMut работал для наших ссылок Pin<&mut Self>, и мы могли изменять поля, такие как self.started и self.futures, не задумываясь об этом.

На этом мы закончим обсуждение Pin, так как доскональные детали не нужны для задач (вторая часть) или ввода‑вывода (третья часть). Но если вы хотите узнать все подробности, начните с этого поста от автора Pin, а затем прочтите официальную документацию Pin.

Бонус: Отмена

Асинхронные функции выглядят и ощущаются как обычные функции, но с особыми суперсилами. Есть еще одна суперсила, о которой мы не упомянули.

Эта суперсила — отмена. Когда мы вызываем обычную функцию в блокирующем коде, у нас нет какого‑то способа отменить вызов через некоторое время. Но мы можем отменить любой future просто…не вызывая его повторно. Для этого в tokio есть tokio::time::timeout, а также у нас есть все нужное, чтобы сделать собственную версию:

struct Timeout<F> {
    sleep: Pin<Box<tokio::time::Sleep>>,
    inner: Pin<Box<F>>,
}

impl<F: Future> Future for Timeout<F> {
    type Output = Option<F::Output>;

    fn poll(
        mut self: Pin<&mut Self>,
        context: &mut Context,
    ) -> Poll<Self::Output> {
        // Check whether the inner future is finished.
        if let Poll::Ready(output) = self.inner.as_mut().poll(context) {
            return Poll::Ready(Some(output));
        }
        // Check whether time is up.
        if self.sleep.as_mut().poll(context).is_ready() {
            return Poll::Ready(None);
        }
        // Still waiting.
        Poll::Pending
    }
}

fn timeout<F: Future>(duration: Duration, inner: F) -> Timeout<F> {
    Timeout {
        sleep: Box::pin(tokio::time::sleep(duration)),
        inner: Box::pin(inner),
    }
}

Playground #10

Эта обертка работает с любой асинхронной функцией. У обычных функций подобного функционала нет.

Бонус: Рекурсия

Единственная суперсила, которой нам не хватает — рекурсия. При попытке вызвать саму себя в асинхронной функции:

async fn factorial(n: u64) -> u64 {
    if n == 0 {
        1
    } else {
        n * factorial(n - 1).await
    }
}

Playground #11

Мы получим ошибку компиляции:

error[E0733]: recursion in an async fn requires boxing
 --> recursion.rs:1:1
  |
1 | async fn factorial(n: u64) -> u64 {
  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
…
5 |         n * factorial(n - 1).await
  |             ---------------------- recursive call here
  |
  = note: a recursive `async fn` must introduce indirection such as `Box::pin` to avoid an infinitely sized future

Когда обычные функции вызывают друг друга, они динамически выделяют память на стеке вызовов. Но когда асинхронные функции выполняют .await друг друга, они компилируются в структуры, содержащие другие структуры, размеры которых статичны. Если асинхронная функция вызывает саму себя, ей приходится помещать рекуррентный future в Box перед вызовом .await:

async fn factorial(n: u64) -> u64 {
    if n == 0 {
        1
    } else {
        let recurse = Box::pin(factorial(n - 1));
        n * recurse.await
    }
}

Это работает, но требует выделения памяти на куче.

Итак, на этом закончим и перейдем к «задачам».

Комментарии (3)


  1. tbl
    26.10.2024 11:07

    С появлением чатжпт теперь редко теперь на хабре увидишь такие качественные и грамотные переводы. Спасибо за проделанную работу


    1. MrPizzly Автор
      26.10.2024 11:07

      Спасибо!

      чатжпт помогает конечно, иногда, с переводом основной структуры, но без вычитки и редактуры никуда не годится по качеству :)


  1. blind_oracle
    26.10.2024 11:07

    Кто-то писал что понять Pin было сложнее чем выучить какой-нибудь язык типа Go :)