Асинхронный Rust в трех частях

Конечно, async/await были придуманы не для сна. Нашей целью с самого начала был ввод‑вывод (I/O), а в особенности сетевой ввод‑вывод. Вооружившись futures и задачами, теперь мы можем перейти к реальным примерам..

Давайте вернемся на минуту к обычному, не асинхронному Rust. Начнем с простого сервера и клиента, который с ним взаимодействует. Используя потоки, объединим сервер и несколько клиентов в один пример, который можно будет запустить в Playground. Когда эта комбинация заработает, мы переведем ее на async, используя основной цикл, который мы написали во второй части.

Вот наш простой сервер:

fn main() -> io::Result<()> {
    let listener = TcpListener::bind("0.0.0.0:8000")?;
    let mut n = 1;
    loop {
        let (mut socket, _) = listener.accept()?;
        let start_msg = format!("start {n}\n");
        socket.write_all(start_msg.as_bytes())?;
        thread::sleep(Duration::from_secs(1));
        let end_msg = format!("end {n}\n");
        socket.write_all(end_msg.as_bytes())?;
        n += 1;
    }
}

Playground #1

Сервер начинает прослушивать порт 8000. Для каждого соединения он записывает сообщение о начале, делает паузу на одну секунду и записывает сообщение о завершении.

Ниже клиент для нашего простого сервера:

fn main() -> io::Result<()> {
    let mut socket = TcpStream::connect("localhost:8000")?;
    io::copy(&mut socket, &mut io::stdout())?;
    Ok(())
}

Playground #2

Клиент открывает соединение с сервером и копирует все полученные байты в стандартный вывод по мере их поступления. Он явно не делает паузу, но выполнение все равно занимает секунду, потому что сервер завершает ответ через секунду. Под капотом: io::copy — это удобная обертка вокруг стандартных методов Read::read и Write::write, и read блокируется до тех пор, пока не поступят данные.

Эти программы не смогут взаимодействовать друг с другом на Playground. Возможно, вам стоит запустить их на своем компьютере, а еще лучше — на двух разных устройствах в вашей сети WiFi. Если вы раньше этого не делали, то посмотреть на их работу в реальной сети может быть довольно интересно. Также может быть полезно просмотреть проект веб‑сервера из 20-й главы книги.

Потоки

Давайте запустим код на Playground, объединив клиент и сервер в одной программе. Поскольку оба процесса блокируются, нам нужно будет запустить их в отдельных потоках. Мы переименуем их функции main в client_main и server_main, и заодно запустим десять клиентов одновременно:

fn main() -> io::Result<()> {
    // Avoid a race between bind and connect by binding before spawn.
    let listener = TcpListener::bind("0.0.0.0:8000")?;
    // Start the server on a background thread.
    thread::spawn(|| server_main(listener).unwrap());
    // Run ten clients on ten different threads.
    let mut client_handles = Vec::new();
    for _ in 1..=10 {
        client_handles.push(thread::spawn(client_main));
    }
    for handle in client_handles {
        handle.join().unwrap()?;
    }
    Ok(())
}

Playground #3

Код работает на Playground, но занимает десять секунд. Хотя клиенты работают параллельно, сервер обрабатывает их по одному за раз. Давайте сделаем так, чтобы сервер создавал новый поток для каждого входящего запроса:

fn one_response(mut socket: TcpStream, n: u64) -> io::Result<()> {
    let start_msg = format!("start {n}\n");
    socket.write_all(start_msg.as_bytes())?;
    thread::sleep(Duration::from_secs(1));
    let end_msg = format!("end {n}\n");
    socket.write_all(end_msg.as_bytes())?;
    Ok(())
}

fn server_main(listener: TcpListener) -> io::Result<()> {
    let mut n = 1;
    loop {
        let (socket, _) = listener.accept()?;
        thread::spawn(move || one_response(socket, n).unwrap());
        n += 1;
    }
}

Playground #4

Код все еще работает, и теперь выполняется всего за секунду. Именно этого мы и добивались. Теперь мы готовы к нашему финальному проекту: расширению основного цикла из второй части и переводу этого примера в асинхронный режим.

Перед нами две основные задачи. Во‑первых, нам нужны функции ввода‑вывода, которые возвращают управление сразу, не блокируясь, даже если входные данные еще не поступили, чтобы мы могли использовать их в Future::poll. Во‑вторых, когда все наши задачи ждут ввода, нам нужно «усыпить» программу вместо активного ожидания, и нужен способ проснуться, когда поступят какие‑либо данные.

Non-blocking

В стандартной библиотеке есть решение для первой проблемы. У типов TcpListener и TcpStream есть методы set_nonblocking, которые дают возможность accept, read и write возвращать ErrorKind::WouldBlock вместо блокировки.

Технически, самого по себе set_nonblocking достаточно для работы с асинхронным вводом‑выводом. Не решив вторую проблему, мы будем использовать 100% ЦП в «занятом цикле» до завершения работы, но наш вывод все равно будет корректным, и мы сможем провести немного подготовительной работы, прежде чем перейдем к более сложной части.

Когда мы писали Foo, JoinAll и Sleep в первой части, каждая из них требовала определения структуры, функции poll и функции конструктора. Чтобы сократить количество шаблонного кода на этот раз, мы используем std::future::poll_fn, который принимает самописную функцию poll и генерирует остальную часть future.

Есть четыре потенциально блокирующих операции, которые нам нужно сделать асинхронными. Это accept и write на стороне сервера, а также connect и read на стороне клиента. Начнем с accept:

async fn accept(
    listener: &mut TcpListener,
) -> io::Result<(TcpStream, SocketAddr)> {
    std::future::poll_fn(|context| match listener.accept() {
        Ok((stream, addr)) => {
            stream.set_nonblocking(true)?;
            Poll::Ready(Ok((stream, addr)))
        }
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
            // TODO: This is a busy loop.
            context.waker().wake_by_ref();
            Poll::Pending
        }
        Err(e) => Poll::Ready(Err(e)),
    }).await
}

Playground #5

Ключевым моментом здесь является обработка ошибок WouldBlock, преобразуя их в Pending. Вызов wake_by_ref всякий раз, когда мы возвращаем Pending, как мы делали во второй версии Sleep из первой части, приводит к «занятому циклу». Мы исправим это в следующем разделе. Мы предполагаем, что TcpListener уже находится в неблокирующем режиме, и мы также переводим возвращаемый TcpStream в неблокирующий режим, чтобы подготовиться к асинхронной записи.

Теперь давайте реализуем запись. Если бы мы хотели скопировать Tokio, нам бы пришлось реализовать трейт AsyncWrite и сделать все дженериками, но это потребовало бы много кода. Вместо этого давайте оставим код небольшим и захардкодим то, что мы пишем в TcpStream:

async fn write_all(
    mut buf: &[u8],
    stream: &mut TcpStream,
) -> io::Result<()> {
    std::future::poll_fn(|context| {
        while !buf.is_empty() {
            match stream.write(&buf) {
                Ok(0) => {
                    let e = io::Error::from(io::ErrorKind::WriteZero);
                    return Poll::Ready(Err(e));
                }
                Ok(n) => buf = &buf[n..],
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // TODO: This is a busy loop.
                    context.waker().wake_by_ref();
                    return Poll::Pending;
                }
                Err(e) => return Poll::Ready(Err(e)),
            }
        }
        Poll::Ready(Ok(()))
    }).await
}

Playground #5

TcpStream::write не гарантирует обработку всего buf, поэтому нам нужно вызывать его в цикле, сдвигая buf вперед каждый раз. Маловероятно, что мы увидим Ok(0) от TcpStream, но если это произойдет, лучше, чтобы это была ошибка, чем бесконечный цикл. Условие цикла также означает, что мы не будем вызывать write, если buf изначально пуст, что соответствует поведению по умолчанию для Write::write_all.

Теперь мы можем написать асинхронную версию server_main:

async fn one_response(mut socket: TcpStream, n: u64) -> io::Result<()> {
    let start_msg = format!("start {n}\n");
    write_all(start_msg.as_bytes(), &mut socket).await?;
    sleep(Duration::from_secs(1)).await;
    let end_msg = format!("end {n}\n");
    write_all(end_msg.as_bytes(), &mut socket).await?;
    Ok(())
}

async fn server_main(mut listener: TcpListener) -> io::Result<()> {
    let mut n = 1;
    loop {
        let (socket, _) = accept(&mut listener).await?;
        spawn(async move { one_response(socket, n).await.unwrap() });
        n += 1;
    }
}

Playground #5

Аналогично примеру с потоками в начале, мы никогда не объединяем задачи сервера, поэтому используем unwrap, чтобы вывести информацию об ошибках в stderr, если они произойдут. Ранее мы делали это внутри замыкания, а здесь — внутри асинхронного блока, который работает как анонимная асинхронная функция без аргументов.

Надеюсь, это сработает, но нам нужно переписать клиент, прежде чем мы сможем протестировать его.

Мы только что реализовали асинхронную запись, так что давайте сделаем асинхронное чтение. Противоположностью Write::write_all является Read::read_to_end, но это не совсем то, что нам нужно. Мы хотим выводить данные сразу после их получения, а не собирать их в Vec и выводить всё в конце. Давайте снова упростим задачу и захардкодим вывод. Назовем это print_all:

async fn print_all(stream: &mut TcpStream) -> io::Result<()> {
    std::future::poll_fn(|context| {
        loop {
            let mut buf = [0; 1024];
            match stream.read(&mut buf) {
                Ok(0) => return Poll::Ready(Ok(())), // EOF
                // Assume that printing doesn't block.
                Ok(n) => io::stdout().write_all(&buf[..n])?,
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // TODO: This is a busy loop.
                    context.waker().wake_by_ref();
                    return Poll::Pending;
                }
                Err(e) => return Poll::Ready(Err(e)),
            }
        }
    }).await
}

Playground #5

Ok(0) при чтении означает конец файла, но в остальном реализация похожа на write_all, описанный выше.

Другим асинхронной частью, которая нам нужна для нашего клиента, является connect, но с этим есть несколько проблем. Во‑первых, TcpStream::connect создает новый поток, и у нас нет возможности вызвать set_nonblocking для этого потока до того, как connect начнет взаимодействовать с сетью. Во‑вторых, connect может включать поиск DNS, и асинхронный DNS — это целая проблема. Решение этих проблем здесь потребовало бы много усилий без особой пользы… поэтому мы собираемся схитрить и просто предположить, что connect не блокирует.

С одной настоящей асинхронной частью и одной откровенной ложью мы можем написать client_main:

async fn client_main() -> io::Result<()> {
    // XXX: Assume that connect() returns quickly.
    let mut socket = TcpStream::connect("localhost:8000")?;
    socket.set_nonblocking(true)?;
    print_all(&mut socket).await?;
    Ok(())
}

Playground #5

И, наконец, async_main:

async fn async_main() -> io::Result<()> {
    // Avoid a race between bind and connect by binding before spawn.
    let listener = TcpListener::bind("0.0.0.0:8000")?;
    listener.set_nonblocking(true)?;
    // Start the server on a background task.
    spawn(async { server_main(listener).await.unwrap() });
    // Run ten clients as ten different tasks.
    let mut task_handles = Vec::new();
    for _ in 1..=10 {
        task_handles.push(spawn(client_main()));
    }
    for handle in task_handles {
        handle.await?;
    }
    Ok(())
}

Playground #5

Работает! Он постоянно работает в цикле и потребляет 100% CPU, но работает.

Poll

Вторая большая проблема, которую нам нужно решить, — это приостановка основного цикла до тех пор, пока не поступят входные данные. Мы не можем сделать это сами, и нам нужна помощь от операционной системы. Для этого мы будем использовать системный вызов poll, который доступен на всех Unix‑подобных операционных системах, включая Linux и macOS. Мы вызовем его, используя функцию стандартной библиотеки C libc::poll, которая выглядит так в Rust:

pub unsafe extern "C" fn poll(
    fds: *mut pollfd,
    nfds: nfds_t,
    timeout: c_int,
) -> c_int

libc::poll принимает список «файловых дескрипторов для опроса» и таймаут в миллисекундах. Таймаут позволит нам пробуждаться для сна, помимо ввода‑вывода, заменяя thread::sleep в нашем основном цикле. Каждый pollfd выглядит следующим образом:

struct pollfd {
    fd: c_int,
    events: c_short,
    revents: c_short,
}

Поле fd — это «файловый дескриптор», или в терминах Rust — «сырой» файловый дескриптор. Это идентификатор, который операционные системы на основе Unix используют для отслеживания открытых ресурсов, таких как файлы и сокеты. Мы можем получить дескриптор от TcpListener или TcpStream, вызвав .as_raw_fd(), который возвращает RawFd, псевдоним для c_int.

Поле events — это коллекция битовых флагов, указывающих, что мы ждем. Наиболее распространенные события — это POLLIN, что означает, что доступен ввод, и POLLOUT, что означает, что в буферах вывода есть свободное место. Мы будем ждать POLLIN, когда получим WouldBlock от чтения, и будем ждать POLLOUT, когда получим WouldBlock от записи.

Поле revents («возвращенные события») похоже, но используется для вывода, а не для ввода. После того как poll возвращается, биты в этом поле указывают, был ли соответствующий дескриптор одним из тех, которые вызвали пробуждение. Мы могли бы использовать это для опроса только конкретных задач, для которых было пробуждение, но для простоты мы проигнорируем это поле и будем опрашивать каждую задачу каждый раз, когда пробуждаемся.

Нашим функциям асинхронного ввода‑вывода, таким как accept, write_all и print_all, нужно будет отправить pollfds и Wakers обратно в main, чтобы main мог вызвать libc::poll. Мы добавим еще пару глобальных Vec для этого, плюс вспомогательную функцию для их заполнения:

static POLL_FDS: Mutex<Vec<libc::pollfd>> = Mutex::new(Vec::new());
static POLL_WAKERS: Mutex<Vec<Waker>> = Mutex::new(Vec::new());

fn register_pollfd(
    context: &mut Context,
    fd: &impl AsRawFd,
    events: libc::c_short,
) {
    let mut poll_fds = POLL_FDS.lock().unwrap();
    let mut poll_wakers = POLL_WAKERS.lock().unwrap();
    poll_fds.push(libc::pollfd {
        fd: fd.as_raw_fd(),
        events,
        revents: 0,
    });
    poll_wakers.push(context.waker().clone());
}

Playground #6

Теперь наши функции асинхронного ввода‑вывода могут вызывать register_pollfd вместо wake_by_ref. accept и print_all — это операции чтения, поэтому они обрабатывают WouldBlock, устанавливая POLLIN:

Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
    register_pollfd(context, listener, libc::POLLIN);
    Poll::Pending
}

Playground #6

write_all обрабатывает WouldBlock, устанавливая POLLOUT:

Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
    register_pollfd(context, stream, libc::POLLOUT);
    return Poll::Pending;
}

Playground #6

Наконец, main. Начнем с подготовки аргумента тайм‑аута для libc::poll. Он аналогичен тому, как мы всё это время вычисляли следующее время пробуждения, за исключением того, что теперь мы не гарантируем его наличие, и нам нужно преобразовать его в миллисекунды:

// Some tasks might wake other tasks. Re-poll if the AwakeFlag has been
// set. Polling futures that aren't ready yet is inefficient but allowed.
if awake_flag.check_and_clear() {
    continue;
}
// All tasks are either sleeping or blocked on IO. Use libc::poll to wait
// for IO on any of the POLL_FDS. If there are any WAKE_TIMES, use the
// earliest as a timeout.
let mut wake_times = WAKE_TIMES.lock().unwrap();
let timeout_ms = if let Some(time) = wake_times.keys().next() {
    let duration = time.saturating_duration_since(Instant::now());
    duration.as_millis() as libc::c_int
} else {
    -1 // infinite timeout
};

Playground #6

После всей этой подготовки мы можем заменить thread::sleep на libc::poll в основном цикле. Это «внешняя» функция, поэтому её вызов является небезопасным:

let mut poll_fds = POLL_FDS.lock().unwrap();
let mut poll_wakers = POLL_WAKERS.lock().unwrap();
let poll_error_code = unsafe {
    libc::poll(
        poll_fds.as_mut_ptr(),
        poll_fds.len() as libc::nfds_t,
        timeout_ms,
    )
};
if poll_error_code < 0 {
    return Err(io::Error::last_os_error());
}

Playground #6

И, наконец, когда мы просыпаемся и libc::poll возвращает управление, нам нужно очистить POLL_FDS и вызвать все POLL_WAKERS. Основной цикл по‑прежнему проверяет все задачи каждый раз, и задачи, которые не находятся в состоянии Ready, зарегистрируются в POLL_FDS перед следующим циклом сна.

poll_fds.clear();
poll_wakers.drain(..).for_each(Waker::wake);
// Invoke Wakers from WAKE_TIMES if their time has come.
while let Some(entry) = wake_times.first_entry() {
    …

Playground #6

Работает!

Вот и всё. Мы сделали это. Наш основной цикл наконец стал циклом событий.

Надеюсь, это небольшое приключение помогло вам сделать асинхронный Rust и асинхронный ввод‑вывод в целом менее загадочными. Удачи! :)

Комментарии (5)


  1. rukhi7
    31.10.2024 18:43

    сервер с абсолютно такой же логикой можно написать на С или С++ на 2-х (двух!) потоках и быть абсолютно уверенным что он использует только 2 потока не зависимо от количества клиентов и от заскоков компилятора. При этом решение будет абсолютно универсальным-переносимым на любую платформу, которая поддерживает соответственно С или С++ и потоки. А можно еще и сделать чтобы количество потоков точно определялось количеством ядер процессора после запуска программы-сервера. Кода будет не намного больше, вряд ли в 2 раза больше.


    1. Boneyan
      31.10.2024 18:43

      Универсальный переносимый сервер на двух потоках можно и на Rust написать.
      Это мне кажется больше обучающая статья про async, поэтому некоторые примеры могут быть ходульные (что в целом распространённая проблема обучающих статей). Главное что показывает как async работает.


      1. rukhi7
        31.10.2024 18:43

        Универсальный переносимый сервер на двух потоках можно и на Rust написать.

        Да, я не корректно выразился! У меня просто среды разработки под Rust нет. Универсальность, конечно, не в языке (Rust, С, С++, ...) который надо выбрать для реализации сервера, универсальность в том что потоки и сокеты это объекты системы и их поведение не зависит от языка (если конечно язык не ограничивает доступ к некоторым функциям этих объектов). А вот с asynch/await все намного сложнее так как это синтаксис или конструкции языка, и в разных языках они могут быть реализованы по разному, используют разные типы (классы) для оформления своей работы.

        Если бы кто-то попробовал сравнить грамотную реализацию на потоках с реализацией через asynch/await это действительно было бы очень поучительно, только я боюсь не в пользу asynch/await.


        1. mayorovp
          31.10.2024 18:43

          Что в вашем понимании есть "грамотная реализация на потоках"?


    1. mayorovp
      31.10.2024 18:43

      сервер с абсолютно такой же логикой можно написать на С или С++ на 2-х (двух!) потоках и быть абсолютно уверенным что он использует только 2 потока не зависимо от количества клиентов и от заскоков компилятора.

      А почему вообще "заскоки компилятора" попали в этот список?