Hello world!


Представляю вашему вниманию вторую часть туториала по Tokio.



Tokio — это асинхронная среда выполнения (runtime) кода Rust. Она предоставляет строительные блоки, необходимые для разработки сетевых приложений любого размера.


Содержание



Кадрирование


Применим то, что мы узнали о вводе-выводе, и реализуем уровень кадрирования (framing layer) Mini-Redis. Кадрирование (framing) — это процесс получения потока байтов и преобразования его в поток кадров. Кадр — это единица данных, передаваемая между двумя узлами (peers). Кадр протокола Redis определяется следующим образом:


use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}

Обратите внимание, что кадр состоит только из данных без какой-либо семантики. Анализ и реализация команд происходят на более высоком уровне.


Для HTTP кадр может выглядеть так:


enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}

Реализуем структуру Connection, которая оборачивает TcpStream и читает/записывает значения mini_redis::Frame:


use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ...
}

impl Connection {
    /// Читает кадр из соединения.
    ///
    /// Возвращает `None` при достижении EOF
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // TODO
    }

    /// Записывает кадр в соединение
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // TODO
    }
}

Подробную информацию о протоколе Redis можно найти здесь. Полный код Connection можно найти здесь.


Буферизованное чтение


Метод read_frame ожидает получения всего кадра перед возвратом. Один вызов TcpStream::read() может вернуть произвольный объем данных. Он может содержать целый кадр, часть кадра или несколько кадров. Если получен частичный кадр, данные буферизуются, и из сокета считываются дополнительные данные. Если получено несколько кадров, возвращается первый, а остальные данные помещаются в буфер до следующего вызова read_frame().


Создаем новый файл:


touch src/connection.rs

Далее в Connection нужно добавить поле для буфера чтения (read buffer). Данные считываются из сокета в буфер чтения. При разборе кадра соответствующие данные удаляются из буфера.


Мы будем использовать BytesMut в качестве типа буфера. Это изменяемая версия Bytes.


use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Выделяем буфер размером 4 КБ
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

Реализуем метод read_frame:


use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        // Пытаемся разобрать кадр из буферизованных данных.
        // Если данных в буфере достаточно, возвращается кадр
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // В буфере недостаточно данных для чтения кадра.
        // Пытаемся получить больше данных из сокета.
        //
        // При успехе возвращается количество байтов.
        // `0` - индикатор "конца потока"
        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            // Другая сторона закрыла соединение. Для чистого закрытия
            // в буфере чтения не должно оставаться данных.
            // Если такие данные имеются, значит другая сторона
            // закрыла соединение во время передачи кадра
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("Connection reset by peer".into());
            }
        }
    }
}

Разберем этот код. Метод read_frame работает в цикле. Сначала вызывается self.parse_frame(). Этот метод пытается разобрать кадр Redis из self.buffer. Если данных достаточно, кадр возвращается вызывающей стороне. В противном случае, мы пытаемся прочитать больше данных из сокета. После считывания дополнительных данных снова вызывается parse_frame().


При чтении из потока возвращаемое значение 0 указывает, что данных от узла больше не будет. Если в буфере чтения все еще есть данные, это означает, что был получен частичный кадр и соединение прервано внезапно. Это состояние ошибки, поэтому возвращается Err.


Типаж Buf


При чтении из потока вызывается read_buf(). Эта версия функции чтения принимает значение, реализующее BufMut из крейта bytes.


Во-первых, подумайте, как мы могли бы реализовать тот же цикл чтения, используя read(). Вместо BytesMut можно использовать Vec<u8>:


use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: Vec<u8>,
    cursor: usize,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            buffer: vec![0; 4096],
            cursor: 0,
        }
    }
}

Функция read_frame в Connection:


use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // Проверяем наличие свободного места в буфере
        if self.buffer.len() == self.cursor {
            // Увеличиваем размер буфера
            self.buffer.resize(self.cursor * 2, 0);
        }

        // Читаем в буфер, отслеживая количество прочитанных байт
        let n = self.stream.read(&mut self.buffer[self.cursor..]).await?;

        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("Connection reset by peer".into());
            }
        } else {
            // Обновляем курсор
            self.cursor += n;
        }
    }
}

При работе с байтовыми массивами и read(), мы должны поддерживать курсор, отслеживающий, какой объем данных был помещен в буфер. Мы должны обязательно передать в функцию read пустую часть буфера. В противном случае, мы перезапишем буферизованные данные. Если буфер заполняется, мы должны увеличить его, чтобы продолжить чтение. В parse_frame() (не входит в пример) нам нужно будет проанализировать данные, содержащиеся в self.buffer[..self.cursor].


Поскольку соединение массива байтов с курсором является очень распространенным, крейт bytes предоставляет абстракцию, представляющую массив байтов и курсор. Типаж Buf реализуется типами, из которых можно читать данные. Типаж BufMut реализуется типами, в которые можно записывать данные. При передаче T: BufMut в read_buf() внутренний курсор буфера автоматически обновляется. Благодаря этому в нашей версии read_frame() нам не нужно управлять собственным курсором.


Кроме того, при использовании Vec<u8> буфер необходимо инициализировать. vec![0; 4096] выделяет массив размером 4096 байт и записывает ноль в каждую ячейку. При изменении размера буфера новая емкость также должна быть инициализирована нулями. Процесс инициализации не является бесплатным. При работе с BytesMut и BufMut емкость не инициализируется. Абстракция BytesMut не позволяет нам читать неинициализированную память. Это позволяет нам избежать этапа инициализации.


Разбор


Теперь рассмотрим функцию parse_frame. Разбор выполняется в два этапа:


  1. Убеждаемся, что в буфере находится полный кадр, и находим конечный индекс кадра.
  2. Разбираем кадр.

Крейт mini-redis предоставляет нам функции для обоих этих шагов:


  1. Frame::check.
  2. Frame::parse.

Мы также будем повторно использовать абстракцию Buf. Buf передается в Frame::check(). Поскольку функция check перебирает переданный буфер, внутренний курсор перемещается вперед. Когда check() возвращается, внутренний курсор буфера указывает на конец кадра.


Для типа Buf мы будем использовать std::io::Cursor<&[u8]>:


use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // Создаем тип `T: Buf`
    let mut buf = Cursor::new(&self.buffer[..]);

    // Проверяем, доступен ли целый кадр
    match Frame::check(&mut buf) {
        Ok(_) => {
            // Получаем длину кадра в байтах
            let len = buf.position() as usize;

            // Сбрасываем внутренний курсор для вызова `parse()`
            buf.set_position(0);

            // Разбираем кадр
            let frame = Frame::parse(&mut buf)?;

            // Удаляем кадр из буфера
            self.buffer.advance(len);

            // Возвращаем кадр
            Ok(Some(frame))
        }
        // В буфере содержится мало данных
        Err(Incomplete) => Ok(None),
        // Возникла ошибка
        Err(e) => Err(e.into()),
    }
}

Полный код функции Frame::check можно найти здесь.


Важно отметить, что используются API Buf в стиле "байтового итератора" (byte iterator). Речь идет об извлечении данных и перемещении внутреннего курсора. Например, чтобы определить тип кадра при его анализе, проверяется первый байт. Используемая функция — Buf::get_u8. Извлекается байт в текущей позиции курсора и курсор перемещается на единицу.


Типаж Buf предоставляет много полезных методов.


Буферизованная запись


Другая половина API кадрирования — это функция write_frame(frame). Эта функция записывает в сокет весь кадр. Чтобы свести к минимуму системные вызовы write(), запись буферизуется. Кадры кодируются в буфер записи (write buffer) перед записью в сокет. Однако, в отличие от read_frame(), весь кадр не всегда буферизуется в массив байтов перед записью в сокет.


Рассмотрим кадр массового (группового) потока (bulk stream frame). Записываемое значение — Frame::Bulk(Bytes). Формат передачи группового кадра — это заголовок кадра, который состоит из символа $, за которым следует длина данных в байтах. Большую часть кадра составляет содержимое значения Bytes. Если данные большие, копирование их в промежуточный буфер будет дорогостоящим.


Для реализации буферизованной записи мы будем использовать структуру BufWriter. Эта структура инициализируется с помощью T: AsyncWrite и сама реализует AsyncWrite. При вызове write() в BufWriter, запись идет не непосредственно в файл для записи, а в буфер. Когда буфер заполняется, его содержимое сбрасывается во внутренний файл для записи, и буфер очищается. Также существуют оптимизации, позволяющие обходить (bypass) буфер в определенных случаях.


Мы реализуем только часть функции write_frame. Полную реализацию смотрите здесь.


Сначала обновляем структуру Connection:


use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
    stream: BufWriter<TcpStream>,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream: BufWriter::new(stream),
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

Затем реализуем write_frame():


use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_frame(&mut self, frame: &Frame)
    -> io::Result<()>
{
    match frame {
        Frame::Simple(val) => {
            self.stream.write_u8(b'+').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Error(val) => {
            self.stream.write_u8(b'-').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Integer(val) => {
            self.stream.write_u8(b':').await?;
            self.write_decimal(*val).await?;
        }
        Frame::Null => {
            self.stream.write_all(b"$-1\r\n").await?;
        }
        Frame::Bulk(val) => {
            let len = val.len();

            self.stream.write_u8(b'$').await?;
            self.write_decimal(len as u64).await?;
            self.stream.write_all(val).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Array(_val) => unimplemented!(),
    }

    self.stream.flush().await;

    Ok(())
}

Используемые здесь функции предоставляются AsyncWriteExt. Они также доступны в TcpStream, но однобайтовую запись без промежуточного буфера выполнять не рекомендуется.


  • write_u8 записывает в файл один байт
  • write_all записывает весь фрагмент
  • write_decimal реализуется mini-redis

Функция заканчивается вызовом self.stream.flush().await. Поскольку BufWriter сохраняет записи в промежуточном буфере, вызовы write() не гарантируют, что данные будут записаны в сокет. Перед возвратом мы хотим, чтобы кадр был записан в сокет. Вызов flush() записывает в сокет любые данные, ожидающие обработки в буфере.


Другой альтернативой было бы не вызывать flush() в write_frame(). Вместо этого можно реализовать flush() у Connection. Это позволит вызывающей стороне записать в очередь несколько небольших кадров в буфер записи, а затем записать их все в сокет с помощью одного системного вызова write(). Это усложняет API Connection. Простота — одна из целей Mini-Redis, поэтому мы включили вызов flush().await в write_frame().


Подробно об асинхронности


Углубимся в модель асинхронной среды выполнения Rust.


Фьючеры (futures)


В качестве краткого обзора возьмем очень простую асинхронную функцию. В ней нет ничего нового.


use tokio::net::TcpStream;

async fn my_async_fn() {
    println!("hello from async");
    let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
    println!("async TCP operation complete");
}

Мы вызываем функцию, и она возвращает некоторое значение. Затем мы вызываем .await на этом значении.


#[tokio::main]
async fn main() {
    // В терминал пока ничего не выводится
    let what_is_this = my_async_fn();
    // Текст печатается в терминале, соединение
    // устанавливается и закрывается
    what_is_this.await;
}

Значение, возвращаемое my_async_fn(), является фьючером. Фьючер — это значение, которое реализует типаж std::future::Future, предоставляемый стандартной библиотекой.


Определение std::future::Future:


use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

Связанный тип (associated type) Output — это тип, который будет создан во фьючере после его завершения. Тип Pin — это то, с помощью чего Rust поддерживает заимствования в асинхронных функциях.


В отличие от того, как фьючеры реализуются в других языках, фьючер Rust не представляет вычисления, происходящие в фоновом режиме, а является самими вычислениями. Владелец фьючера отвечает за выполнение (advance) вычислений путем его опроса (poll).


Реализация Future


Реализуем очень простой фьючер. Он будет делать следующее:


  1. Ждать какое-то временя.
  2. Выводить некоторый текст в STDOUT.
  3. Возвращать строку.

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("hello world");
            Poll::Ready("done")
        } else {
            // Пока не обращайте внимания на эту строку
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

Асинхронная функция как фьючер


В функции main мы создаем экземпляр фьючера и вызываем на нем .await. В асинхронных функциях мы можем вызывать .await для любого значения, реализующего Future. В свою очередь, вызов асинхронной функции возвращает анонимный тип, реализующий Future. В случае async fn main() сгенерированный фьючер выглядит примерно так:


use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
    // Инициализирован, не опрашивался
    State0,
    // Ждет `Delay` - строка `future.await`
    State1(Delay),
    // Фьючер завершен
    Terminated,
}

impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use MainFuture::*;

        loop {
            match *self {
                State0 => {
                    let when = Instant::now() + Duration::from_millis(10);
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

Фьючеры Rust — это машины состояний (state machines). Здесь MainFuture представлено как перечисление возможных состояний фьючера. Фьючер начинается в состоянии State0. Когда вызывается call(), фьючер пытается обновить свое внутреннее состояние. Если фьючер может завершиться, возвращается Poll::Ready, содержащий результат асинхронных вычислений.


Если фьючер не может завершиться, обычно из-за нехватки ресурсов, возвращается Poll::Pending. Получение Poll::Pending указывает вызывающей стороне, что фьючер завершится позже, и вызывающая сторона должна снова вызвать call() через какое-то время.


Мы также видим, что фьючеры состоят из других фьючеров. Вызов call() внешнего фьючера приводит к вызову call() внутреннего фьючера.


Исполнители (executors)


Асинхронные функции Rust возвращают фьючеры. Для обновления состояния фьючера должен вызываться call(). Фьючеры состоят из других фьючеров. Вопрос в том, что вызывает call() самого внешнего фьючера?


Напомним, что для запуска асинхронных функций их необходимо либо передать в tokio::spawn(), либо сделать их основной функцией, помеченной с помощью #[tokio::main]. В результате сгенерированный внешний фьючер передается исполнителю Tokio. Исполнитель отвечает за вызов Future::poll() на внешнем фьючере, доводя асинхронные вычисления до завершения.


Мини Tokio


Чтобы лучше понять, как все это сочетается друг с другом, реализуем собственную минимальную версию Tokio! Полный код можно найти здесь.


use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;

fn main() {
    let mut mini_tokio = MiniTokio::new();

    mini_tokio.spawn(async {
        let when = Instant::now() + Duration::from_millis(10);
        let future = Delay { when };

        let out = future.await;
        assert_eq!(out, "done");
    });

    mini_tokio.run();
}

struct MiniTokio {
    tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
    fn new() -> MiniTokio {
        MiniTokio {
            tasks: VecDeque::new(),
        }
    }

    /// Создает фьючер на экземпляре mini-tokio
    fn spawn<F>(&mut self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        self.tasks.push_back(Box::pin(future));
    }

    fn run(&mut self) {
        let waker = task::noop_waker();
        let mut cx = Context::from_waker(&waker);

        while let Some(mut task) = self.tasks.pop_front() {
            if task.as_mut().poll(&mut cx).is_pending() {
                self.tasks.push_back(task);
            }
        }
    }
}

Это запускает асинхронный блок. Экземпляр Delay создается с указанной задержкой и "ожидается". Однако наша реализация на данный момент имеет серьезный недостаток. Наш исполнитель никогда не спит. Исполнитель непрерывно просматривает все порожденные фьючеры и опрашивает их. Большую часть времени фьючер не будет готов выполнять новую работу и будет возвращать Poll::Pending. Этот процесс будет сжигать циклы ЦП и, как правило, будет не очень эффективным.


В идеале мы хотим, чтобы mini-tokio опрашивал фьючеры только тогда, когда они готовы к выполнению новой задачи. Это происходит, когда ресурс, на котором заблокирована задача, готов выполнить запрошенную операцию. Если задача хочет прочитать данные из сокета TCP, мы должны опрашивать задачу только тогда, когда сокет TCP получил данные. В нашем случае задача блокируется при достижении указанного момента времени (Instant). В идеале, mini-tokio должен опрашивать задачу только по прошествии этого времени.


Для этого опрошенный, но не готовый ресурс должен отправить уведомление исполнителю при переходе в состояние готовности.


Будильники (wakers)


Будильники — недостающая часть. Это система, с помощью которой ресурс может уведомить ожидающую задачу о том, что он готов продолжить выполнение операции.


Еще раз взглянем на определение Future::poll():


fn poll(self: Pin<&mut Self>, cx: &mut Context)
    -> Poll<Self::Output>;

Аргумент Context метода poll имеет метод waker. Этот метод возвращает Waker, привязанный к текущей задаче. У Waker есть метод wake. Вызов этого метода сигнализирует исполнителю, что связанную задачу следует запланировать для выполнения. Ресурсы вызывают wake(), когда переходят в состояние готовности, чтобы уведомить исполнителя о том, что опрос задачи может продолжиться.


Обновление Delay


Мы можем обновить Delay, чтобы использовать будильники:


use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("hello world");
            Poll::Ready("done")
        } else {
            // Получаем дескриптор будильника для текущей задачи
            let waker = cx.waker().clone();
            let when = self.when;

            // Создаем поток таймера
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                waker.wake();
            });

            Poll::Pending
        }
    }
}

Теперь по истечении указанного времени вызывающая задача получит уведомление, и исполнитель запланирует ее повторный опрос. Следующим шагом будет обновление mini-tokio для регистрации уведомлений о пробуждении.


С нашей реализацией Delay все еще есть несколько проблем. Мы исправим их позже.


Когда фьючер возвращает Poll::Pending, он должен гарантировать, что в какой-то момент будет подан сигнал о пробуждении. Если этого не сделать, задача будет "висеть" бесконечно.

Вспомним код Delay:


impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("hello world");
            Poll::Ready("done")
        } else {
            // !
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

Прежде чем вернуть Poll::Pending, мы вызываем cx.waker().wake_by_ref(). Это необходимо для выполнения контракта фьючера. Возвращая Poll::Pending, мы отвечаем за вызов будильника. Поскольку мы еще не реализовали поток таймера (timer thread), то подаем встроенный сигнал. Это приводит к тому, что фьючер будет немедленно повторно запланирован, повторно выполнен и, вероятно, снова не будет готов к завершению.


Обратите внимание, что мы можем подавать сигнал чаще, чем это необходимо. В данном конкретном случае мы подаем сигнал о пробуждении, хотя вообще не готовы продолжать операцию. В этом нет ничего плохого, кроме ненужной траты ресурсов процессора. Однако эта конкретная реализация приведет к образованию цикла занятости (busy loop).


Обновление Mini Tokio


Следующим шагом будет обновление Mini Tokio для получения уведомлений о пробуждении. Мы хотим, чтобы исполнитель запускал задачи только после их пробуждения, и для этого Mini Tokio предоставит собственный будильник. Когда будильник вызывается, связанная с ним задача ставится в очередь на выполнение. Mini Tokio передает этот сигнал во фьючер при его опросе.


Обновленный Mini Tokio будет использовать канал для хранения запланированных задач. Каналы позволяют ставить задачи в очередь для выполнения из любого потока. Будильники должны реализовывать Send и Sync.


Типажи Send и Sync — это маркерные типажи (marker traits), связанные с параллелизмом Rust. Типы, которые можно отправить в другой поток, реализуют Send. Большинство типов являются Send, но некоторые (вроде Rc) не являются. Типы, к которым можно одновременно получить доступ через неизменяемые ссылки, реализуют Sync. Тип может быть Send, но не Sync. Хорошим примером является Cell, который можно изменить с помощью неизменяемой ссылки, и поэтому одновременный доступ к нему небезопасен.

Обновляем структуру MiniTokio:


use std::sync::mpsc;
use std::sync::Arc;

struct MiniTokio {
    scheduled: mpsc::Receiver<Arc<Task>>,
    sender: mpsc::Sender<Arc<Task>>,
}

struct Task {
    // TODO
}

Будильники являются Sync и могут клонироваться. При вызове wake() задача должна планироваться для выполнения. Для реализации этого у нас есть канал. При вызове wake() задача передается в отправляющую половину канала. Наша структура Task будет реализовывать логику пробуждения. Для этого ему необходимо содержать как порожденный фьючер, так и отправителя из канала. Мы поместим фьючер в структуру TaskFuture рядом с перечислением Poll, чтобы отслеживать результат последнего вызова Future::poll(), который необходим для обработки ложных пробуждений. Более подробная информация представлена ​​в реализации метода poll в TaskFuture.


use std::sync::{Arc, Mutex};

/// Структура, содержащая фьючер и результат
/// последнего вызова его метода `poll`
struct TaskFuture {
    future: Pin<Box<dyn Future<Output = ()> + Send>>,
    poll: Poll<()>,
}

struct Task {
    // `Mutex` позволяет `Task` реализовать `Sync`.
    // `task_future` доступна одновременно только одному потоку.
    // `Mutex` является опциональным. Настоящий Tokio
    // не использует здесь мьютекс, но настоящий Tokio содержит
    // гораздо больше строк кода, чем может уместиться на одной странице туториала
    task_future: Mutex<TaskFuture>,
    executor: mpsc::Sender<Arc<Task>>,
}

impl Task {
    fn schedule(self: &Arc<Self>) {
        self.executor.send(self.clone());
    }
}

Для планирования задачи, Arc клонируется и отправляется по каналу. Теперь нам нужно подключить функцию schedule к std::task::Waker. Стандартная библиотека предоставляет для этого низкоуровневый API с использованием ручного построения vtable. Эта стратегия обеспечивает максимальную гибкость для разработчиков, но требует некоторого небезопасного шаблонного кода. Вместо прямого использования RawWakerVTable мы будем использовать утилиту ArcWake, предоставляемую крейтом futures. Это позволит нам реализовать простой типаж, чтобы представить структуру Task как будильник.


Добавляем следующую зависимость в файл Cargo.toml:


futures = "0.3"

Затем реализуем futures::task::ArcWake:


use futures::task::{self, ArcWake};
use std::sync::Arc;

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        arc_self.schedule();
    }
}

Когда поток таймера вызывает waker.wake(), задача передается в канал. Реализуем получение и выполнение задач в функции MiniTokio::run:


impl MiniTokio {
    fn run(&self) {
        while let Ok(task) = self.scheduled.recv() {
            task.poll();
        }
    }

    /// Инициализирует новый экземпляр mini-tokio
    fn new() -> MiniTokio {
        let (sender, scheduled) = mpsc::channel();

        MiniTokio { scheduled, sender }
    }

    /// Создает фьючер на экземпляре mini-tokio.
    ///
    /// Данный фьючер обернут в `Task` и помещен в очередь `scheduled`.
    /// Он будет выполнен при вызове `run()`
    fn spawn<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        Task::spawn(future, &self.sender);
    }
}

impl TaskFuture {
    fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture {
        TaskFuture {
            future: Box::pin(future),
            poll: Poll::Pending,
        }
    }

    fn poll(&mut self, cx: &mut Context<'_>) {
        // Разрешены ложные пробуждения, даже после того, как фьючер
        // вернул `Ready`. Однако опрос фьючера, вернувшего
        // `Ready`, не разрешен. Поэтому мы должны проверять,
        // что фьючер находится в режиме ожидания перед его вызовом.
        // В противном случае, может возникнуть паника
        if self.poll.is_pending() {
            self.poll = self.future.as_mut().poll(cx);
        }
    }
}

impl Task {
    fn poll(self: Arc<Self>) {
        // Создаем будильник из экземпляра `Task`.
        // Здесь используется реализация `ArcWake`
        let waker = task::waker(self.clone());
        let mut cx = Context::from_waker(&waker);

        // Никакой другой поток не может заблокировать `task_future`
        let mut task_future = self.task_future.try_lock().unwrap();

        // Опрашиваем внутренний фьючер
        task_future.poll(&mut cx);
    }

    // Создаем новую задачу с данным фьючером.
    //
    // Инициализируем новый `Task`, содержащий данный фьючер и помещаем его
    // в `sender`. Получатель канала получит задачу и выполнит ее
    fn spawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let task = Arc::new(Task {
            task_future: Mutex::new(TaskFuture::new(future)),
            executor: sender.clone(),
        });

        let _ = sender.send(task);
    }
}

Здесь происходит много всего. Во-первых, реализован MiniTokio::run(). Функция работает в цикле, получая из канала запланированные задачи.


Функции new и spawn используют канал, а не VecDeque. Когда создаются новые задачи, им предоставляется клон отправителя, который задача может использовать для планирования во время выполнения.


Функция Task::poll создает будильник с помощью утилиты ArcWake из крейта futures. Будильник используется для создания task::Context. Этот task::Context передается в poll().


Резюме


Мы рассмотрели полный пример того, как работает асинхронный Rust. async/await в Rust обеспечивается типажами. Это позволяет сторонним крейтам, таким как Tokio, предоставлять детали реализации.


  • Асинхронные операции Rust ленивы и требуют, чтобы вызывающая сторона опрашивала их
  • будильники передаются фьючерам, чтобы связать фьючер с вызывающей его задачей
  • когда ресурс не готов завершить операцию, возвращается Poll::Pending и записывается будильник задачи
  • когда ресурс становится готовым, об этом уведомляется будильник задачи
  • исполнитель получает уведомление и планирует выполнение задачи
  • задача опрашивается еще раз, на этот раз ресурс готов, и задача выполняется

Ремарки


Помните, при реализации Delay, мы отметили, что нужно исправить еще несколько вещей. Асинхронная модель Rust позволяет одному фьючеру мигрировать между задачами во время их выполнения. Рассмотрим следующий код:


use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let mut delay = Some(Delay { when });

    poll_fn(move |cx| {
        let mut delay = delay.take().unwrap();
        let res = Pin::new(&mut delay).poll(cx);

        assert!(res.is_pending());

        tokio::spawn(async move {
            delay.await;
        });

        Poll::Ready(())
    }).await;
}

Функция poll_fn создает экземпляр Future с помощью замыкания. Приведенный код создает экземпляр Delay, опрашивает его один раз, затем отправляет его в новую задачу, где он ожидается. Delay::poll() вызывается несколько раз с разными экземплярами Waker. В этом случае, мы должны убедиться, что вызов wake() на Waker передан самому последнему вызову poll().


При реализации фьючера очень важно предполагать, что каждый вызов poll() может предоставлять другой экземпляр Waker. Функция poll должна обновлять любой ранее записанный Waker новым.


Наша более ранняя реализация Delay порождала новый поток при каждом его опросе. Это нормально, но может быть очень неэффективно, если он опрашивается слишком часто (например, если мы применяем select! для этого и другого фьючера, оба будут опрашиваться всякий раз, когда в любом из них происходит событие). Один из подходов к решению этой задачи — запоминать факт создания потока и создавать новый поток только в том случае, если он еще не создан. Однако при таком подходе, мы должны убедиться, что Waker потока обновляется при последующих вызовах call(), поскольку, в противном случае, мы активируем не самый последний Waker.


Нашу предыдущую реализацию можно исправить так:


use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
    // Имеет значение `Some`, если поток создан, и `None`, в противном случае
    waker: Option<Arc<Mutex<Waker>>>,
}

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // Проверяем текущий экземпляр. Если время истекло, значит
        // этот фьючер завершился, возвращаем `Poll::Ready`
        if Instant::now() >= self.when {
            return Poll::Ready(());
        }

        // Время не истекло. Если фьючер вызывается впервые,
        // создаем поток таймера. Если поток уже создан,
        // проверяем, что сохраненный `Waker` совпадает с `Waker` текущей задачи
        if let Some(waker) = &self.waker {
            let mut waker = waker.lock().unwrap();

            // Проверяем, что сохраненный `Waker` совпадает с `Waker` текущей задачи.
            // Это необходимо, поскольку экземпляр фьючера `Delay` может быть перемещен
            // в другую задачу между вызовами `poll()`. Если это произойдет,
            // будильник, содержащийся в данном `Context` будет отличаться, и мы должны
            // обновить хранящийся будильник для учета этого изменения
            if !waker.will_wake(cx.waker()) {
                *waker = cx.waker().clone();
            }
        } else {
            let when = self.when;
            let waker = Arc::new(Mutex::new(cx.waker().clone()));
            self.waker = Some(waker.clone());

            // `poll()` вызывается впервые, создаем поток таймера
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                // Время истекло. Уведомляем вызывающего, вызывая будильник
                let waker = waker.lock().unwrap();
                waker.wake_by_ref();
            });
        }

        // К этому моменту будильник сохранен и таймер запущен.
        // Время не истекло, следовательно, фьючер не завершился,
        // поэтому мы должны вернуть `Poll::Pending`.
        //
        // Контракт типажа `Future` требует, чтобы при возврате `Pending`
        // фьючер гарантировал уведомление будильника о
        // необходимости повторного опроса. В нашем случае,
        // возвращая здесь `Pending`, мы обещаем, что вызовем
        // будильник, содержащийся в аргументе `Context`,
        // по истечении запрошенного времени. Мы обеспечиваем это путем
        // создания потока таймера выше.
        //
        // Если мы забудем вызвать будильник, задача повиснет навсегда
        Poll::Pending
    }
}

Это немного сложно, но идея состоит в том, что при каждом вызове poll() фьючер проверяет, что переданный будильник совпадает с ранее записанным. Если будильники совпадают, больше делать нечего. Если они не совпадают, тогда записанный будильник обновляется.


Утилиты Notify


Мы продемонстрировали, как фьючер Delay можно реализовать вручную с помощью будильника. Будильники являются основой того, как работает асинхронный Rust. Обычно нет необходимости опускаться до этого уровня. Например, в случае с Delay мы могли бы полностью реализовать его с помощью async/await, используя утилиту tokio::sync::Notify. Эта утилита предоставляет базовый механизм уведомления о задачах. Она обрабатывает подробную информацию о будильниках, включая проверку соответствия записанного будильника текущей задаче.


Используя Notify, мы можем реализовать функцию delay следующим образом:


use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify_clone.notify_one();
    });

    notify.notified().await;
}

Выбор


До сих пор, для добавления в систему параллелизма, мы создавали новую задачу. Рассмотрим другие способы одновременного выполнения асинхронного кода с помощью Tokio.


tokio::select!


Макрос tokio::select! позволяет ожидать выполнения нескольких асинхронных вычислений и возвращает результат после завершения любого из них.


Пример:


use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

У нас есть два канала oneshot. Любой канал может завершиться первым. Оператор select! ожидает на обоих каналах и привязывает val к ​​значению, возвращаемому задачей. Когда tx1 или tx2 завершаются, соответствующий блок выполняется.


Ветка, которая не завершилась, удаляется. В этом примере вычисление ожидает oneshot::Receiver для каждого канала. oneshot::Receiver для канала, который еще не завершен, удаляется.


Отмена


В асинхронном Rust отмена выполняется путем удаления фьючера. Асинхронные операции в Rust реализуются с помощью фьючеров, которые являются ленивыми. Операция продолжается только при опросе фьючера. Если фьючер уничтожен, операция не может быть продолжена, поскольку все связанное состояние было удалено.


Тем не менее, иногда асинхронная операция порождает фоновые задачи или запускает другую операцию, выполняемую в фоновом режиме. В приведенном выше примере создается задача для отправки сообщения. Обычно задача выполняет некоторые вычисления для генерации значения.


Фьючеры или другие типы могут реализовать Drop для очистки. oneshot::Receiver реализует Drop, отправляя уведомление о закрытии половине Sender. Половина-отправитель получает это уведомление и прерывает текущую операцию, уничтожая ее.


use tokio::sync::oneshot;

async fn some_operation() -> String {
    // Вычисляем значение
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        // Выбираем операцию и уведомление `closed()`
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                // `some_operation()` отменена,
                // задача завершена, `tx1` уничтожен
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

Реализация Future


Чтобы лучше понять, как работает select!, рассмотрим, как будет выглядеть гипотетическая реализация Future. Это упрощенная версия. На практике select! включает дополнительный функционал, такой как случайный выбор первой ветки для опроса.


use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MySelect {
    rx1: oneshot::Receiver<&'static str>,
    rx2: oneshot::Receiver<&'static str>,
}

impl Future for MySelect {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
            println!("rx1 completed first with {:?}", val);
            return Poll::Ready(());
        }

        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
            println!("rx2 completed first with {:?}", val);
            return Poll::Ready(());
        }

        Poll::Pending
    }
}

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    // Используем `tx1` и `tx2`

    MySelect {
        rx1,
        rx2,
    }.await;
}

Фьючер MySelect содержит фьючеры из каждой ветки. При опросе MySelect опрашивается первая ветвь. Если она готова, ее значение возвращается и MySelect завершается. После того, как .await получает результат фьючера, он удаляется. Это приводит к тому, что фьючеры обоих ветвей уничтожаются. Поскольку одна из ветвей не завершается, соответствующая операция фактически отменяется.


Из предыдущего раздела:


Когда фьючер возвращает Poll::Pending, он должен гарантировать подачу сигнала о пробуждении в будущем. В противном случае, задача будет висеть вечно.

В реализации MySelect нет явного использования аргумента Context. Вместо этого, требование пробуждения удовлетворяется путем передачи cx во внутренние фьючеры. Поскольку внутренний фьючер также должен соответствовать требованию пробуждения, MySelect также удовлетворяет требованию пробуждения, возвращая Poll::Pending только при получении Poll::Pending из внутреннего фьючера.


Синтаксис


Макрос select! может обрабатывать более двух ветвей. Текущий лимит — 64 ветви. Каждая ветвь структурирована следующим образом:


<шаблон> = <асинхронное выражение> => <обработчик>,

При оценке select! все <асинхронные выражения> объединяются и выполняются одновременно. Когда выражение завершается, его результат сопоставляется с <шаблоном>. Если результат соответствует шаблону, все оставшиеся асинхронные выражения удаляются и выполняется <обработчик>. <обработчик> имеет доступ к любым привязкам, установленным <шаблоном>.


Основной случай использования <шаблона> — это название переменной: результат асинхронного выражения привязан к названию переменной, и <обработчик> имеет доступ к этой переменной. Вот почему в приведенном выше примере val используется для <шаблона>, а <обработчик> имеет доступ к val.


Если <шаблон> не соответствует результату асинхронного вычисления, оставшиеся асинхронные выражения продолжают выполняться одновременно до тех пор, пока не завершится следующее. К этому результату применяется та же логика.


Поскольку select! принимает любое асинхронное выражение, можно определить более сложные вычисления для выбора.


Здесь мы выбираем результат канала oneshot и TCP-соединение:


use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // Создаем задачу, которая отправляет сообщение по каналу
    tokio::spawn(async move {
        tx.send("done").unwrap();
    });

    tokio::select! {
        socket = TcpStream::connect("localhost:3465") => {
            println!("socket connected {:?}", socket);
        }
        msg = rx => {
            println!("received message {:?}", msg);
        }
    }
}

Здесь мы выбираем канал oneshot и принимаем сокеты из TcpListener:


use std::io;
use tokio::net::TcpListener;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send(()).unwrap();
    });

    let listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        _ = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Недостижимый, но необходимый код
            Ok::<_, io::Error>(())
        } => {}
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

fn process(_socket: tokio::net::TcpStream) {}

Цикл выполняется до тех пор, пока не произойдет ошибка или пока rx не получит значение. Шаблон _ указывает, что нас не интересует результат асинхронной операции.


Возвращаемое значение


Макрос select! возвращает результат выражения, вычисленного <обработчиком>:


async fn computation1() -> String {
    // ...
}

async fn computation2() -> String {
    // ...
}

#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("{out}");
}

По этой причине требуется, чтобы <обработчики> каждой ветви возвращали одинаковый тип. Если результат select! не требуется, рекомендуется, чтобы выражение оценивалось как ().


Ошибки


Оператор ? распространяет (propagate) ошибку из выражения (ошибка поднимается на уровень выше). Как это работает, зависит от того, где ? используется, в асинхронном выражении или в обработчике. В первом случае ? распространяет ошибку из асинхронного выражения. Это делает результат асинхронного выражения Result. Во втором случае ? немедленно распространяет ошибку за пределы выражения select!. Давайте еще раз посмотрим на пример цикла принятия TCP-соединений:


use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // ...

    let listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        res = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            Ok::<_, io::Error>(())
        } => {
            res?;
        }
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

Обратите внимание на listener.accept().await?. Оператор ? перемещает ошибку из этого выражения в переменную res. В случае ошибки для параметра res будет установлено значение Err(_). Затем в обработчике снова используется ?. Инструкция res? распространяет ошибку из функции main.


Сопоставление с шаблоном


Напомним синтаксис ветви select!:


<шаблон> = <асинхронное выражение> => <обработчик>,

До сих пор мы использовали привязки переменных только для <шаблона>. Однако можно использовать любой шаблон Rust. Предположим, что мы получаем данные из нескольких каналов MPSC, мы можем сделать следующее:


use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // Делаем что-нибудь с `tx1` и `tx2`
    });

    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            println!("Both channels closed");
        }
    }
}

В этом примере выражение select! ожидает получения значения от rx1 и rx2. Если канал закрывается, recv() возвращает None. Это не соответствует шаблону, и ветка отключается. select! продолжает ожидать оставшиеся ветки.


Обратите внимание, что этот select! включает ветку else. select! должен возвращать значение. При использовании сопоставления с шаблоном возможно, что ни одна из ветвей не будет соответствовать шаблону. В этом случае оценивается ветвь else.


Заимствование


При создании задачи, порожденное асинхронное выражение должно владеть всеми своими данными. Макрос select! не имеет этого ограничения. Асинхронное выражение каждой ветки может заимствовать данные и работать с ними одновременно. Следуя правилам заимствования Rust, несколько асинхронных выражений могут заимствовать один неизменный фрагмент данных, или только одно асинхронное выражение может заимствовать изменяемый фрагмент данных.


Рассмотрим несколько примеров. Здесь мы одновременно отправляем одни и те же данные в два разных пункта назначения TCP:


use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
    data: &[u8],
    addr1: SocketAddr,
    addr2: SocketAddr
) -> io::Result<()> {
    tokio::select! {
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr1).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr2).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        else => {}
    };

    Ok(())
}

Переменная data неизменяемо заимствована в обоих асинхронных выражениях. Когда любая операция завершается успешно, другая уничтожается. Поскольку мы сопоставляем шаблон с Ok(_), в случае провала одного выражения, другое продолжит выполняться.


Когда дело доходит до <обработчика> каждой ветки, select! гарантирует, что запускается только один <обработчик>. Из-за этого каждый <обработчик> может мутабельно заимствовать одни и те же данные.


Например, в следующем примере out модифицируется в обоих обработчиках:


use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // Отправляем значения в `tx1` и `tx2`
    });

    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}

Циклы


Макрос select! часто используется в циклах. Рассмотрим несколько примеров. Начнем с выбора нескольких каналов:


use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);

    loop {
        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {:?}", msg);
    }

    println!("All channels have been closed.");
}

В этом примере выполняется выбор приемников трех каналов. При получении сообщения по любому каналу, оно записывается в STDOUT. Когда канал закрывается, recv() возвращает None. Благодаря сопоставлению с шаблоном, макрос select! продолжает ждать сообщения на остальных каналах. Когда все каналы закрываются, оценивается ветвь else и цикл завершается.


Макрос select! произвольно выбирает ветку для проверки ее готовности. Если несколько каналов имеют ожидающие значения, для приема будет выбран произвольный канал. Это необходимо для обработки случая, когда цикл приема обрабатывает сообщения медленнее, чем они передаются в каналы, а это означает, что каналы начинают заполняться (fill up). Если select! не будет произвольно выбирать ветвь для проверки, на каждой итерации цикла сначала будет проверяться rx1. Если rx1 всегда будет содержать новое сообщение, остальные каналы никогда не будут проверяться.


Если при оценке select! несколько каналов имеют ожидающие сообщения, извлекается значение только из одного канала. Все остальные каналы остаются нетронутыми, и их сообщения остаются в этих каналах до следующей итерации цикла. Ни одно сообщение не теряется.

Возобновление асинхронной операции


Теперь посмотрим, как запустить асинхронную операцию в нескольких select!. В этом примере у нас есть канал MPSC с типом элемента i32 и асинхронная функция. Мы хотим запускать асинхронную функцию до тех пор, пока она не завершится, или пока в канале не будет получено четное целое число.


async fn action() {
    // ...
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let operation = action();
    tokio::pin!(operation);

    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

Обратите внимание, что вместо вызова функции action в макросе select!, она вызывается вне цикла. Результат action() присваивается переменной operation без вызова .await. Затем мы вызываем макрос tokio::pin! на operation.


В цикле select!, вместо передачи operation, мы передаем &mut operation. operation отслеживает текущую асинхронную операцию. Каждая итерация цикла использует одну и ту же operation вместо нового вызова action().


Другой select! получает сообщение из канала. Если сообщение является четным целым числом, цикл завершается. В противном случае, снова запускается select!.


Здесь мы впервые используем tokio::pin!. Мы пока не будем вдаваться в подробности закрепления (pinning). Следует отметить, что для ожидания ссылки, значение, на которое ссылаются, должно быть закреплено или реализовывать Unpin.


Если мы удалим строку tokio::pin! и попытаемся скомпилировать код, то получим следующую ошибку:


error[E0599]: no method named `poll` found for struct
     `std::pin::Pin<&mut &mut impl std::future::Future>`
     in the current scope
  --> src/main.rs:16:9
   |
16 | /         tokio::select! {
17 | |             _ = &mut operation => break,
18 | |             Some(v) = rx.recv() => {
19 | |                 if v % 2 == 0 {
...  |
22 | |             }
23 | |         }
   | |_________^ method not found in
   |             `std::pin::Pin<&mut &mut impl std::future::Future>`
   |
   = note: the method `poll` exists but the following trait bounds
            were not satisfied:
           `impl std::future::Future: std::marker::Unpin`
           which is required by
           `&mut impl std::future::Future: std::future::Future`

Если вы столкнулись с такой ошибкой, вероятно, Future необходимо закрепить. Узнать больше о Pin можно здесь.


Модификация ветки


Рассмотрим немного более сложный цикл. У нас есть:


  1. Канал значений i32.
  2. Асинхронная операция над этими значениями.

Логика, которую мы хотим реализовать:


  1. Ждем получения четного числа из канала.
  2. Запускаем асинхронную операцию с четным числом в качестве аргумента.
  3. Ждем завершения операции и в тоже время регистрируем новые четные числа, поступающие из канала.
  4. Если до завершения операции получено новое четное число, прерываем текущую операцию и запускаем новую.

async fn action(input: Option<i32>) -> Option<String> {
    // Если аргументом является `None`, возвращаем `None`.
    // Это можно переписать как `let i = input?;`
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // Асинхронная логика
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);

    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });

    loop {
        tokio::select! {
            res = &mut operation, if !done => {
                done = true;

                if let Some(v) = res {
                    println!("GOT = {}", v);
                    return;
                }
            }
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` это метод, предоставляемый `Pin`
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

Мы используем ту же стратегию, что и в предыдущем примере. Асинхронная функция action вызывается вне цикла и присваивается переменной operation. operation закрепляется. Цикл выбирает как операцию, так и приемник канала.


Обратите внимание, что action принимает Option<i32> в качестве аргумента. Перед получением первого четного числа, нам нужно инициализировать чем-то operation. Мы делаем так, чтобы action() принимала Option и возвращала Option. Если передается None, возвращается None. На первой итерации цикла, операция немедленно завершается со значением None.


В этом примере используется новый синтаксис. Первая ветка включает , if !done. Это предварительное условие ветвления. Прежде чем объяснять, как это работает, давайте посмотрим, что произойдет, если предусловие опустить. Удаление , if !done и выполнение примера приведет к следующему результату:


thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Эта ошибка возникает при попытке использовать operation после ее завершения. Обычно при использовании .await, ожидаемое значение потребляется. В этом примере мы ожидаем ссылку. Это означает, что operation существует после завершения.


Чтобы избежать этой паники, мы должны позаботиться о том, чтобы отключить первую ветвь, если операция завершена. Переменная done используется для отслеживания того, завершена ли операция. Ветвь select! может включать предварительное условие. Это предварительное условие проверяется перед тем, как select! начинает ждать на ветке. Если условие оценивается как false, ветвь отключается. done инициализируется значением false. После завершения operation, done устанавливается в значение true. На следующей итерации ветка отключается. При получении четного числа из канала, operation сбрасывается, и done устанавливается в false.


Параллелизм по задачам


И tokio::spawn(), и select! позволяют запускать параллельные асинхронные операции. Однако стратегия, используемая для выполнения параллельных вычислений, отличается. Функция tokio::spawn принимает асинхронную операцию и создает новую задачу для ее выполнения. Задача — это объект, который планируется средой выполнения Tokio. Две разные задачи планируются независимо. Они могут выполняться одновременно в разных потоках операционной системы. По этой причине порожденная задача имеет то же ограничение, что и порожденный поток: отсутствие заимствований.


Макрос select! попеременно запускает все ветки одной и той же задачи. Поскольку все ветки выполняются для одной и той же задачи, они никогда не будут запускаться одновременно. Макрос select! мультиплексирует асинхронные операции в одной задаче.


Потоки


Поток — это асинхронная серия значений. Это асинхронный эквивалент std::iter::Iterator, представленный типажом Stream. Потоки можно перебирать в асинхронных функциях. Их также можно трансформировать с помощью адаптеров. Tokio предоставляет несколько распространенных адаптеров в типаже StreamExt.


Tokio предоставляет поддержку потоков в отдельном крейте tokio-stream:


tokio-stream = "0.1"

В настоящее время утилиты для работы с потоками Tokio находятся в крейте tokio-stream. После стабилизации Stream в стандартной библиотеке Rust, эти утилиты будут перемещены в крейт tokio.

Перебор


В настоящее время язык программирования Rust не поддерживает асинхронные циклы for. Вместо этого перебор потоков выполняется с помощью цикла while let в сочетании с методом StreamExt::next:


use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);

    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

Как и итераторы, метод next возвращает Option<T>, где T — тип значения потока. Получение None указывает на то, что итерация потока завершена.


Трансляция Mini-Redis


Рассмотрим немного более сложный пример с использованием клиента Mini-Redis. Полный код можно найти здесь.


use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Публикуем некоторые данные
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("done");

    Ok(())
}

Сначала создается задача для публикации сообщений на сервере Mini-Redis в канале numbers. Затем мы подписываемся на этот канал в основной задаче и отображаем полученные сообщения.


После подписки на вернувшемся подписчике вызывается метод into_stream. Это потребляет Subscriber, возвращая поток, который выдает сообщения по мере их поступления. Обратите внимание, что перед перебором сообщений поток закрепляется в стеке с помощью макроса tokio::pin!. Вызов next() для потока требует его закрепления. Функция into_stream возвращает незакрепленный поток, мы должны явно закрепить его, чтобы выполнить итерацию.


Значение Rust "закрепляется", когда его больше нельзя перемещать в памяти. Ключевой особенностью закрепленного значения является то, что указатели на него всегда остаются действительными. Эта особенность используется async/await для поддержки заимствования данных через точки .await.

Если мы забудем закрепить поток, то получим ​​ошибку.


Запускаем сервер Mini-Redis:


mini-redis-server

Запускаем пример:


got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

Некоторые ранние сообщения могут быть пропущены, поскольку между подпиской и публикацией идет гонка. Программа никогда не завершается. Подписка на канал Mini-Redis остается активной, пока активен сервер.


Посмотрим, как мы можем работать с потоками, чтобы расширить эту программу.


Адаптеры


Функции, которые принимают Stream и возвращают другой Stream, часто называют "адаптерами потока", поскольку они представляют собой форму "шаблона адаптера" (adapter pattern). Популярными адаптерами потока являются map, take и filter.


Обновим Mini-Redis, чтобы он завершал работу. После получения трех сообщений прекращаем получать сообщения. Это делается с помощью take(). Этот адаптер ограничивает поток так, чтобы он выдавал не более n сообщений:


let messages = subscriber
    .into_stream()
    .take(3);

Запускаем программу:


got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

На этот раз программа завершается.


Теперь давайте ограничим поток однозначными числами. Проверяем длину сообщения. Для отброса любого сообщения, не соответствующего предикату, используется filter():


let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);

Запускаем программу:


got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

Обратите внимание, что порядок применения адаптеров имеет значение. Вызов filter(), а затем take() отличается от вызова take(), а затем filter().


Наконец, приведем в порядок вывод, удалив часть Ok(Message { ... }). Это делается с помощью map(). Поскольку map() вызывается после filter(), мы знаем, что сообщение Ok, поэтому можем использовать unwrap():


let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

Запускаем программу:


got = b"1"
got = b"3"
got = b"6"

filter() и map() можно объединить в один вызов с помощью filter_map().


Существуют и другие адаптеры.


Реализация Stream


Типаж Stream очень похож на типаж Future:


use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

Функция Stream::poll_next во многом похожа на функцию Future::poll, за исключением того, что ее можно вызывать повторно для получения нескольких значений из потока. Как мы видели в одном из предыдущих разделов, когда поток не готов вернуть значение, он возвращает Poll::Pending. При этом регистрируется будильник задачи. Как только поток должен быть снова опрошен, будильник получает уведомление.


Метод size_hint используется так же, как и с итераторами.


Обычно при реализации потока вручную, это делается путем композиции фьючеров и других потоков. В качестве примера перепишем фьючер Delay. Мы преобразуем его в поток, который выдает () три раза с интервалом 10 мс:


use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Interval {
    fn new() -> Self {
        Self {
            rem: 3,
            delay: Delay { when: Instant::now() }
        }
    }
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // Задержек больше нет
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

async-stream


Реализация потоков вручную с помощью типажа Stream может быть утомительной. К сожалению, язык программирования Rust пока не поддерживает синтаксис async/await для определения потоков.


В качестве временного решения можно использовать крейт async-stream. Он предоставляет макрос stream!, который преобразует входные данные в поток. С помощью этого крейта вышеуказанный интервал можно реализовать следующим образом:


use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
    let mut when = Instant::now();

    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}

Это конец второй части и туториала, в целом.


Happy coding!




Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud — в нашем Telegram-канале

Комментарии (0)