Hello world!
Представляю вашему вниманию первую часть туториала по Tokio.
Tokio — это асинхронная среда выполнения (runtime) кода Rust. Она предоставляет строительные блоки, необходимые для разработки сетевых приложений любого размера.
Содержание
-
-
-
-
-
- Стратегии
Добавление зависимости bytes
Инициализация HashMap
Обновление функции process
- Задачи, потоки и конкуренция
Удержание MutexGuard
через.await
-
- Передача сообщений
- Примитивы каналов Tokio
- Определение типа сообщения
- Создание канала
- Создание управляющей задачи
- Получение ответов
- Обратное давление и ограниченные каналы
-
-
-
Практическое руководство по Rust
-
Обзор
На высоком уровне Tokio предоставляет несколько основных компонентов:
- многопоточную среду выполнения асинхронного кода
- асинхронную версию стандартной библиотеки (std)
- большую экосистему библиотек
Назначение Tokio
Асинхронный код позволяет выполнять несколько задач одновременно. Однако Rust не предоставляет встроенных инструментов для выполнения такого кода, для этого требуется сторонняя среда выполнения. Tokio является самой популярной из таких сред.
Кроме того, Tokio предоставляет множество полезных утилит. При написании асинхронного кода мы не можем использовать обычные блокирующие API, предоставляемые стандартной библиотекой Rust, и должны использовать их асинхронные версии. Tokio предоставляет такие версии, где это необходимо.
Преимущества Tokio
Скорость
Tokio является быстрым, поскольку таковым является сам Rust. Производительность кода, написанного с помощью Tokio, сопоставима с производительностью кода, написанного самостоятельно.
Tokio является масштабируемым благодаря масштабируемости async/await
. При работе с сетью существует естественный предел скорости обработки запроса из-за задержки (latency), поэтому единственным способом масштабирования является обработка нескольких запросов одновременно. async/await
позволяет легко и дешево увеличивать количество одновременно выполняемых задач.
Надежность
Tokio основан на Rust — языке, позволяющем разрабатывать надежное и эффективное ПО. Разные исследования (например, это и это) показывают, что около 70% серьезных ошибок безопасности являются результатом небезопасной работы с памятью. Использование Rust избавляет от всего этого класса багов в приложении.
Tokio сосредоточен на предоставлении предсказуемого поведения без сюрпризов. Основная цель Tokio — предоставить пользователям возможность развертывать предсказуемое ПО, которое будет работать одинаково изо дня в день с одинаковым временем отклика и без непредсказуемых скачков задержки.
Простота
Благодаря появлению async/await
в Rust, сложность создания асинхронных приложений существенно снизилась. В сочетании с утилитами Tokio и динамичной экосистемой, разработка асинхронных приложений становится проще простого.
Tokio следует соглашению об именовании стандартной библиотеки, когда это имеет смысл. Это позволяет легко конвертировать код, написанный с использованием стандартной библиотеки, в код, написанный с помощью Tokio. Благодаря строгой системе типов Rust, обеспечение правильности написанного кода не знает себе равных.
Случаи, для которых Tokio не подходит
Хотя Tokio полезен для многих проектов, в которых необходимо выполнять много задач одновременно, есть случаи, для которых он не подходит:
- ускорение вычислений, выполняемых ЦП, за счет их параллельного выполнения в нескольких потоках. Tokio предназначен для приложений, связанных с вводом-выводом, где каждая отдельная задача проводит большую часть времени в ожидании ввода-вывода. Если все, что делает наше приложение, — это параллельные вычисления, нам следует использовать rayon. Впрочем, ничто не мешает нам использовать Tokio и
rayon
вместе (пример) - чтение большого количества файлов. Может показаться, что Tokio будет полезен для проектов, которым нужно читать много файлов, однако Tokio здесь не дает никаких преимуществ по сравнению с обычным пулом потоков (thread pool). Это связано с тем, что ОС обычно не предоставляют асинхронные файловые API
- отправка единственного веб-запроса. Tokio полезен, когда нужно выполнять много задач одновременно. Если нам нужно использовать библиотеку, предназначенную для выполнения асинхронного кода, такую как reqwest, но не нужно выполнять много задач одновременно, следует предпочесть блокирующую версию этой библиотеки, поскольку она упростит код проекта. Конечно, в этом случае Tokio будет работать, но не даст никаких преимуществ перед блокирующим API
Настройка
Этот туториал шаг за шагом проведет вас через процесс создания клиента и сервера Redis. Мы начнем с основ асинхронного программирования на Rust и будем двигаться дальше. Мы реализуем несколько команд Redis и получим полный обзор Токио.
Проект, который мы создадим в этом туториале, доступен как Mini-Redis на GitHub. Mini-Redis разработан с основной целью изучения Tokio и поэтому очень хорошо прокомментирован, но это также означает, что в Mini-Redis отсутствуют некоторые функции, которые нужны в настоящей библиотеке Redis. Готовые к использованию библиотеки Redis можно найти на crates.io.
Предварительные условия
Для прохождения этого туториала вы должны быть знакомы с Rust.
Хотя это и не обязательно, но некоторый опыт написания сетевого кода с использованием стандартной библиотеки Rust или другого языка программирования может оказаться полезным.
Рекомендуется использовать самую последнюю стабильную версию Rust.
Далее нужно установить сервер Mini-Redis. Он будет использоваться для тестирования разрабатываемого нами клиента.
cargo install mini-redis
Команда для запуска сервера:
mini-redis-server
Команда для получения значения по ключу foo
(выполняем в отдельном терминале):
mini-redis-cli get foo
Выполнение этой команды должно привести к отображению (nil)
в терминале.
Привет, Tokio
Начнем с создания очень простого приложения Tokio. Оно будет подключаться к серверу Mini-Redis и устанавливать ключ hello
в значение world
. Затем оно будет читать значение этого ключа. Это будет делаться с помощью клиента Mini-Redis.
Код
Создание крейта
Начнем с генерации нового приложения Rust:
cargo new my-redis
cd my-redis
Добавление зависимостей
Открываем файл Cargo.toml
и добавляем следующие записи в раздел [dependencies]
:
tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"
Написание кода
Редактируем файл main.rs
следующим образом:
use mini_redis::{client, Result};
#[tokio::main]
async fn main() -> Result<()> {
// Подключаемся к серверу mini-redis
let mut client = client::connect("127.0.0.1:6379").await?;
// Устанавливаем ключ "hello" в значение "world"
client.set("hello", "world".into()).await?;
// Получаем значение ключа "hello"
let result = client.get("hello").await?;
println!("От сервера получено: {:?}", result);
Ok(())
}
Убедитесь, что сервер Mini-Redis запущен.
Запускаем приложение my-redis
:
cargo run
Получаем:
От сервера получено: Some(b"world")
Полный код примера можно найти здесь.
Разбор
Разберем код, который мы написали, построчно. Его мало, но делает он много всего.
let mut client = client::connect("127.0.0.1:6379").await?;
Функция client::connect предоставляется крейтом mini-redis. Она асинхронно устанавливает TCP-соединение с указанным сервером (адресом). После установки соединения возвращается дескриптор клиента (client
). Несмотря на то, что операция выполняется асинхронно, код выглядит синхронным. Единственный признак того, что операция является асинхронной, — оператор .await
.
Что такое асинхронное программирование?
Большинство компьютерных программ выполняются в том порядке, в котором они написаны. Выполняется первая строка, затем следующая и т.д. В синхронном программировании, когда программа встречает операцию, которая не может быть завершена немедленно, она блокируется до тех пор, пока операция не завершится. Например, для установки TCP-соединения требуется обмен данными с одноранговым узлом (peer) по сети, что занимает некоторое время. В это время поток блокируется.
В асинхронном программировании операции, которые не могут завершиться немедленно, приостанавливаются в фоновом режиме. Поток не блокируется и может выполнять другие задачи. После завершения операции, задача возобновляется и продолжает выполняться с того места, где она была остановлена. В нашем примере имеется только одна задача, поэтому пока она приостановлена, ничего не происходит, но асинхронные программы обычно имеют много таких задач.
Хотя асинхронное программирование может привести к созданию более быстрых приложений, оно часто приводит к созданию гораздо более сложных программ. Программисту необходимо отслеживать все состояния, необходимые для возобновления работы после завершения асинхронной операции. Это утомительная и подверженная ошибкам задача.
Rust реализует асинхронное программирование с помощью паттерна async/await. Функции, выполняющие асинхронные операции, помечаются ключевым словом async
. В нашем примере функция connect
определяется следующим образом:
use mini_redis::Result;
use mini_redis::client::Client;
use tokio::net::ToSocketAddrs;
pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> {
// ...
}
async fn
выглядит как обычная синхронная функция, но работает асинхронно. Rust преобразует асинхронную функцию во время компиляции в подпрограмму (routine), работающую асинхронно. Вызов .await
внутри async fn
возвращает управление потоку. Поток может выполнять другую работу, пока асинхронная операция выполняется в фоновом режиме.
Хотя другие языки тоже реализуют async/await
, Rust использует уникальный подход. Прежде всего, асинхронные операции в Rust являются ленивыми (lazy). Это приводит к отличиям в семантике времени выполнения.
Использование async/await
Асинхронная функция вызываются также, как обычная. Однако вызов такой функции не приводит к выполнению ее тела. Вместо этого вызов async fn
возвращает значение, представляющее операцию. Это концептуально аналогично замыканию с нулевым аргументом (zero-argument closure). Для фактического запуска операции используется оператор .await
на возвращаемом значении.
Например, результатом выполнения программы
async fn say_world() {
println!("world");
}
#[tokio::main]
async fn main() {
// Вызов функции `say_world` не выполняет ее тело
let op = say_world();
// Сначала выполняется этот код
println!("hello");
// Вызов `.await` на `op` выполняет тело `say_world()`
op.await;
}
является
hello
world
Возвращаемое значение async fn
— это анонимный тип, реализующий типаж (трейт) Future (фьючер).
Асинхронная функция main
Функция main
, используемая для запуска приложения, отличается от обычной, встречающейся в большинстве крейтов Rust:
- Это
async fn
. - Она аннотирована с помощью
#[tokio::main]
.
async fn
используется для входа в асинхронный контекст. Однако асинхронные функции должны выполняться runtime. runtime
содержит асинхронный планировщик задач, обеспечивает событийный ввод-вывод, таймеры и др. Среда выполнения не запускается автоматически, поэтому ее должна запустить функция main
.
Функция #[tokio::main]
— это макрос. Она преобразует async fn main()
в синхронную fn main()
, которая инициализирует экземпляр среды выполнения и выполняет тело асинхронной функции.
Например, код
#[tokio::main]
async fn main() {
println!("hello");
}
трансформируется в
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
Возможности Cargo
В этом туториале используются все возможности Tokio (флаг full
):
tokio = { version = "1", features = ["full"] }
Tokio предоставляет богатый функционал (TCP, UDP, сокеты Unix, таймеры, утилиты синхронизации, несколько типов планировщиков и др.). Не всем приложениям нужен весь этот функционал. При оптимизации времени компиляции или размера конечного файла приложения можно указать только те функции, которые используются приложением.
Создание потоков
Приступим к разработке сервера Redis.
Сначала переместим код клиента из предыдущего раздела в отдельный файл:
mkdir -p examples
mv src/main.rs examples/hello-redis.rs
Затем создадим новый пустой файл src/main.rs
.
Прием сокетов
Первое, что должен делать наш сервер, — принимать входящие TCP-сокеты. Это делается путем привязки tokio::net::TcpListener к порту 6379.
Многие типы Tokio называются также, как их синхронные эквиваленты в стандартной библиотеке Rust. Когда это имеет смысл, Tokio предоставляет те же API, что иstd
, но с использованиемasync fn
.
Сокеты принимаются в цикле. Каждый сокет обрабатывается и закрывается. Прочитаем команду, выведем ее на стандартный вывод и ответим ошибкой:
// src/main.rs
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// Привязываем обработчик к адресу
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// Второй элемент содержит IP и порт нового подключения
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// `Connection` позволяет читать/писать кадры (frames) redis вместо
// потоков байтов. Тип `Connection` определяется mini-redis
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Отвечаем ошибкой
let response = Frame::Error("Unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
Запускаем программу:
cargo run
Запускаем пример hello-redis
в отдельном терминале:
cargo run --example hello-redis
Вывод в терминале примера:
Error: "Unimplemented"
Вывод в терминале сервера:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
Одновременность
У нашего сервера есть небольшая проблема (помимо того, что он отвечает только ошибками). Он обрабатывает входящие запросы по одному. После установки соединения, сервер остается внутри цикла приема подключений до тех пор, пока ответ не будет полностью записан в сокет.
Мы хотим, чтобы наш сервер обрабатывал много запросов одновременно. Для этого нам нужно добавить немного конкурентности.
Одновременность (concurrency) и параллелизм (parallelism) — это не одно и тоже. Если мы переключаемся между двумя задачами, то мы работаем над ними одновременно, а не параллельно. Чтобы эту работу можно было считать параллельной, нам потребуются два человека, по одному на каждую задачу.
Одним из преимуществ использования Tokio является то, что асинхронный код позволяет работать над многими задачами одновременно, без необходимости работать над ними параллельно с использованием обычных потоков. Фактически, Tokio может выполнять множество задач одновременно в одном потоке!
Для одновременной обработки соединений для каждого входящего соединения создается новая задача. Соединение обрабатывается этой задачей.
Цикл принятия соединений становится таким:
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// Для каждого входящего сокета создается новая задача. Сокет
// перемещается в новую задачу и обрабатывается там
tokio::spawn(async move {
process(socket).await;
});
}
}
Задачи
Задача Tokio — это асинхронный зеленый поток (green thread). Они создаются путем передачи async
блока в tokio::spawn()
. Функция tokio::spawn
возвращает JoinHandle
, который вызывающая сторона может использовать для взаимодействия с созданной задачей. async
блок может иметь возвращаемое значение. Вызывающая сторона может получить его с помощью .await
на JoinHandle
.
Например:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Выполняем асинхронную работу
"return value"
});
// Выполняем другую работу
let out = handle.await.unwrap();
println!("GOT: {}", out);
}
Ожидание JoinHandle
возвращает Result
. Если во время выполнения задачи возникает ошибка, JoinHandle
возвращает Err
. Это происходит, когда задача либо вызывает панику, либо принудительно отменяется из-за закрытия среды выполнения.
Задача — это единица выполнения (unit of execution), управляемая планировщиком. При создании задачи она передается планировщику Tokio, который гарантирует ее выполнение при появлении у нее работы. Порожденная задача может выполняться в том же потоке, в котором она была создана, или в другом потоке времени выполнения. Задачу также можно перемещать между потоками после создания.
Задачи в Токио очень легкие. По сути, им требуется только одно выделение и 64 байта памяти. Приложения должны иметь возможность свободно создавать тысячи, если не миллионы задач.
Привязка 'static
При создании задачи в среде выполнения Tokio, время жизни ее типа должно быть 'static
. Это означает, что порожденная задача не должна содержать никаких ссылок на данные, не принадлежащие ей.
Распространено заблуждение, что'static
означает "жить вечно", но это не так. Тот факт, что значение является'static
, не означает, что у нас есть утечка памяти. Больше об этом можно прочитать здесь.
Например, следующий код не скомпилируется:
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Это вектор: {:?}", v);
});
}
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Это вектор: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Это вектор: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Это вектор: {:?}", v);
9 | });
|
Это происходит потому, что по умолчанию переменные не перемещаются в асинхронные блоки. Вектор v
остается во владении функции main
. println!
заимствует v
. Компилятор Rust любезно объясняет нам это и даже предлагает исправление! Изменение строки 7 на task::spawn(async move {
даст указание компилятору переместить v
в порожденную задачу. Теперь задача владеет всеми своими данными, что делает их 'static
.
Если часть данных должна быть доступна одновременно в нескольких задачах, ее необходимо распределять (сделать общей) с помощью примитивов синхронизации, таких как Arc
.
Обратите внимание, что в сообщении об ошибке говорится о том, что тип аргумента переживает время жизни 'static
. Эта терминология может сбивать с толку, поскольку время жизни 'static
длится до конца программы, поэтому, если тип переживает его, не возникает ли у нас утечки памяти? Объяснение состоит в том, что именно тип, а не значение, должен переживать время жизни 'static
, и значение может быть уничтожено до того, как его тип перестанет быть действительным.
Когда мы говорим, что значение является "статическим", это означает лишь то, что было бы правильно хранить его вечно. Это важно, поскольку компилятор не может определить, как долго будет выполняться вновь созданная задача. Мы должны убедиться, что задаче разрешено жить вечно, чтобы Tokio мог выполнять ее столько, сколько необходимо.
"Привязка 'static
", "тип, переживающий 'static
" и "'static
значение" обозначают одно и тоже — T: 'static
, в отличие от "аннотации с помощью 'static
", как в &'static T
.
Привязка bound
Задачи, порожденные tokio::spawn()
, должны реализовывать типаж Send
. Это позволяет среде выполнения Tokio перемещать задачи между потоками, пока они приостановлены в .await
.
Задачи являются Send
, когда все данные, хранящиеся в вызовах .await
, являются таковыми. При вызове .await
задача возвращается (yields back) планировщику. При следующем выполнении задачи, она возобновляется с той точки, на которой была приостановлена (yielded) в последний раз. Чтобы это работало, все состояние, используемое после .await
, должно сохраняться задачей. Если это состояние являются Send
, т.е. его можно перемещать между потоками, то и саму задачу можно перемещать между потоками. И наоборот, если состояние не являются Send
, то и задача тоже.
Например, это работает:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// Область видимости уничтожает `rc` перед `.await`
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` больше не используется. Он не сохраняется, когда
// задача возвращается планировщику
yield_now().await;
});
}
А это не работает:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` используется после `.await`. Он должен быть сохранен в
// состоянии задачи
yield_now().await;
println!("{}", rc);
});
}
Попытка компиляции этого фрагмента завершается такой ошибкой:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
Хранение значений
Теперь мы реализуем функцию process
для обработки входящих команд. Мы будем использовать HashMap
для хранения значений. Команды SET
будут добавлять значения в HashMap
, а команды GET
будут извлекать значения из HashMap
. Кроме того, мы будем использовать цикл для приема нескольких команд в одном соединении.
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// Хранилище данных
let mut db = HashMap::new();
// `Connection`, предоставляемое `mini-redis`, обрабатывает разбор кадров из сокета
let mut connection = Connection::new(socket);
// Используем `read_frame` для получения команды из соединения
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// Значение хранится в виде `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` ожидает, что данные будут иметь тип `Bytes`.
// Мы рассмотрим этот тип позже.
// `&Vec<u8>` преобразуется в `Bytes` с помощью метода `into`
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("Не реализовано {:?}", cmd),
};
// Отправляем (пишем) ответ клиенту
connection.write_frame(&response).await.unwrap();
}
}
Запускаем сервер:
cargo run
В отдельном терминале запускаем пример hello-redis
:
cargo run --example hello-redis
Полный код примера можно найти здесь.
Теперь мы можем устанавливать и получать значения, но есть одна проблема: значения не распределяются между соединениями. Если другой подключенный сокет попытается получить значение по ключу hello
, он получит (nil)
.
Общее состояние
На данный момент у нас имеется работающий сервер "ключ-значение". Однако у него есть серьезный недостаток: состояние не распределяется между соединениями. Давайте это исправим.
Стратегии
В Tokio существует несколько разных способов распределять состояние:
- Защита общего состояния с помощью
Mutex
. - Создание задачи для управления состоянием и использование передачи сообщений для работы с ней.
Обычно первый подход используется для простых данных, а второй — для вещей, требующих асинхронной работы, таких как примитивы ввода-вывода. В нашем случае общим состоянием является HashMap
, а операциями — insert()
и get()
. Ни одна из этих операций не является асинхронной, поэтому мы будем использовать Mutex
.
Второй подход будет рассмотрен в следующей главе.
Добавление зависимости bytes
Вместо Vec<u8>
Mini-Redis использует Bytes
из крейта bytes. Цель Bytes
— предоставить надежную структуру массива байтов для сетевого программирования. Основная особенность, которую он добавляет к Vec<u8>
, — это поверхностное клонирование (shallow cloning). Другими словами, вызов метода clone
для экземпляра Bytes
не копирует данные. Тип Bytes
примерно соответствует Arc<Vec<u8>>
, но с некоторыми дополнительными возможностями.
Добавляем bytes
в раздел [dependencies]
файла Cargo.toml
:
bytes = "1"
Инициализация HashMap
HashMap
будет использоваться многими задачами и, возможно, многими потоками. Для поддержки этого его нужно обернуть в Arc<Mutex<_>>
.
Во-первых, для удобства добавим следующий псевдоним типа после операторов use
:
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
Затем обновляем функцию main
, чтобы инициализировать HashMap
и передать дескриптор (handle) Arc
в функцию process
. Использование Arc
позволяет одновременно ссылаться на HashMap
из многих задач, потенциально работающих во многих потоках. В Tokio термин "дескриптор" используется для обозначения значения, которое обеспечивает доступ к некоторому общему состоянию.
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Клонируем дескриптор в `HashMap`
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
Об использовании std::sync::Mutex
Обратите внимание, что для защиты HashMap
используется std::sync::Mutex
, а не tokio::sync::Mutex
. Распространенной ошибкой является безусловное использование tokio::sync::Mutex
в асинхронном коде. Асинхронный мьютекс — это мьютекс, который блокируется при вызовах .await
.
Синхронный мьютекс блокирует текущий поток в ожидании получения блокировки (acquire the lock). Это, в свою очередь, блокирует обработку других задач. Однако переключение на tokio::sync::Mutex
обычно не помогает, поскольку асинхронный мьютекс использует синхронный мьютекс под капотом. Как правило, использование синхронного мьютекса в асинхронном коде вполне допустимо, пока конкуренция остается низкой и блокировка не удерживается при вызовах .await
.
Обновление функции process
Функция process
больше не инициализирует HashMap
. Вместо этого, она принимает общий дескриптор HashMap
в качестве параметра. Перед использованием HashMap
его необходимо заблокировать. Помните, что типом значения HashMap
теперь является Bytes
(который можно легко клонировать), поэтому его также необходимо изменить.
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// Блокируем `HashMap`
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
// Блокируем `HashMap`
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("Не реализовано {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
Задачи, потоки и конкуренция
Использование блокирующего мьютекса для защиты небольших критических блоков кода является приемлемой стратегией, когда конкуренция минимальна. Когда возникает борьба за блокировку, поток, выполняющий задачу, должен заблокироваться и дождаться мьютекса. Это заблокирует не только текущую задачу, но также другие задачи, запланированные в текущем потоке.
По умолчанию Tokio использует многопоточный планировщик. Задачи планируются для любого количества потоков, управляемых средой выполнения. Если запланировано выполнение большого количества задач и все они требуют доступа к мьютексу, возникает конфликт. С другой стороны, если используется вариант среды выполнения (runtime flavor) current_thread, то мьютекс никогда не будет конкурировать.
Вариант среды выполнения current_thread
представляет собой облегченную однопоточную среду выполнения. Это хороший выбор, когда создается всего несколько задач и открывается несколько сокетов. Например, этот вариант хорошо работает при предоставлении моста синхронного API поверх асинхронной клиентской библиотеки.
Если конкуренция за синхронный мьютекс становится проблемой, переключение на мьютекс Tokio редко будет лучшим решением. Вместо этого можно рассмотреть следующие варианты:
- переключение на специальную задачу для управления состоянием и использование передачи сообщений для работы с ней
- сегментирование (shard) мьютекса
- реструктуризация кода для удаления мьютекса
В нашем случае, поскольку каждый ключ независим, отлично подойдет сегментирование мьютекса. Для этого вместо одного экземпляра Mutex<HashMap<_, _>>
мы создадим N
отдельных экземпляров.
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Mutex::new(HashMap::new()));
}
Arc::new(db)
}
После этого поиск ячейки для любого заданного ключа становится двухэтапным процессом. Сначала ключ используется для определения того, частью какого сегмента он является. Затем ключ просматривается в HashMap
.
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
Простая реализация, описанная выше, требует использования фиксированного количества сегментов, и количество сегментов не может быть изменено после создания сегментированной карты. Крейт dashmap предоставляет реализацию более сложной сегментированной хэш-карты.
Удержание MutexGuard
через .await
Мы можем написать код, который выглядит так:
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // здесь `lock` выходит из области видимости
При попытке создать что-то, вызывающее эту функцию, мы получим следующее сообщение об ошибке:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
Это происходит потому, что тип std::sync::MutexGuard
не является Send
. Это означает, что мы не можем отправить блокировку мьютекса в другой поток, и ошибка возникает, потому что Tokio может перемещать задачу между потоками при каждом .await
. Следовательно, нам нужно реструктурировать код таким образом, чтобы деструктор блокировки мьютекса запускался до .await
.
// Это работает
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // здесь `lock` выходит из области видимости
do_something_async().await;
}
Обратите внимание, что это не работает:
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
Это связано с тем, что компилятор вычисляет, является ли фьючер (future) Send
, только на основе информации об области видимости. Возможно, в будущем компилятор будет поддерживать явное удаление блокировки, но сейчас для этого необходимо использовать область видимости.
Не пытайтесь решить эту проблему путем создания задачи, которая не должна быть Send
, потому что если Tokio приостановит задачу в .await
, пока она удерживает блокировку, в том же потоке может быть запланировано выполнение другой задачи, которая также может попытаться заблокировать этот мьютекс, что приведет к взаимоблокировке, поскольку задача, ожидающая блокировки мьютекса, не позволит задаче, удерживающей мьютекс, освободить его.
Ниже мы обсудим некоторые подходы к решению этой проблемы.
Реструктуризация кода, чтобы не удерживать блокировку через .await
Мы уже видели один пример этого в приведенном выше фрагменте кода, но есть несколько более надежных способов. Например, можно обернуть мьютекс в структуру и блокировать его только внутри синхронных методов этой структуры:
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// Эта функция является синхронной
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
Этот шаблон гарантирует, что мы не столкнемся с ошибкой Send
, поскольку защита мьютекса отсутствует в асинхронной функции.
Создание задачи для управления состоянием и использование передачи сообщений для работы с ней
Это второй подход, упомянутый в начале этого раздела, и он часто используется, когда общий ресурс является ресурсом ввода-вывода. Мы подробно поговорим о нем в следующем разделе.
Использование асинхронного мьютекса Токио
Также можно использовать тип tokio::sync::Mutex, предоставляемый Tokio. Основная особенность мьютекса Tokio заключается в том, что его можно без проблем удерживать в .await
. Тем не менее, асинхронный мьютекс обходится дороже, чем обычный, и лучше использовать один из двух других подходов.
// Мьютекс Tokio
use tokio::sync::Mutex;
// Это компилируется
// (но в данном случае реструктуризация кода будет лучшим решением)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // здесь `lock` выходит из области видимости
Каналы
Возвращаемся к клиенту. Поместим код сервера, который мы написали, в отдельный двоичный файл:
mkdir src/bin
mv src/main.rs src/bin/server.rs
Создаем новый двоичный файл, который будет содержать код клиента:
touch src/bin/client.rs
Команда для запуска сервера:
cargo run --bin server
Команда для запуска клиента:
cargo run --bin client
Допустим, мы хотим запустить две команды Redis одновременно. Мы можем создать одну задачу для каждой команды. Тогда обе команды будут выполняться одновременно.
Мы могли бы написать что-то вроде этого:
use mini_redis::client;
#[tokio::main]
async fn main() {
// Подключаемся к серверу
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Создаем две задачи: одна извлекает значение, другая устанавливает значение по ключу
let t1 = tokio::spawn(async {
let res = client.get("foo").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
Но этот код не скомпилируется, поскольку обеим задачам требуется доступ к клиенту. Поскольку Client
не реализует Copy
, он не будет компилироваться без кода, обеспечивающего возможность его совместного использования. Кроме того, Client::set
принимает &mut self
, что означает, что для его вызова требуется монопольный доступ к клиенту. Мы могли бы открывать соединение для каждой задачи, но это не оптимально. Мы не можем использовать std::sync::Mutex
, так как .await
нужно вызывать с удерживаемой блокировкой. Мы могли бы использовать tokio::sync::Mutex
, но это позволит выполнить только один текущий запрос. Если клиент реализует конвейерную обработку, асинхронный мьютекс приведет к недостаточному использованию (underutilizing) соединения.
Передача сообщений
Ответ заключается в использовании передачи сообщений. Шаблон предполагает создание специальной задачи для управления ресурсом client
. Любая задача, желающая обработать (issue) запрос, отправляет сообщение задаче client
. Задача client
выдает (issue) запрос от имени отправителя, и ответ отправляется отправителю (простите за тавтологию).
В этой стратегии устанавливается одно соединение. Задача, управляющая client
, может получить к нему монопольный доступ для вызова get
и set
. Кроме того, канал работает как буфер. Операции могут отправляться задаче client
, пока она занята. Как только задача client
освобождается (становится доступной для обработки новых запросов), она извлекает следующий запрос из канала. Это может привести к повышению пропускной способности и может быть расширено для поддержки пула соединений.
Примитивы каналов Tokio
Tokio предоставляет несколько каналов, которые служат разным целям:
- mpsc — канал с несколькими производителями (producer) и одним потребителем (consumer). Можно отправлять много значений
- oneshot — канал с одним производителем и одним потребителем. Можно отправлять одно значение
- broadcast — канал с несколькими производителями и потребителями. Можно отправлять несколько значений. Каждый получатель видит каждое значение
- watch — канал с одним производителем и несколькими потребителями. Можно отправлять много значений, но история не сохраняется. Каждый получатель видит только последнее значение
Если нам нужен многопользовательский канал с несколькими производителями, где каждое сообщение видит только один потребитель, можно использовать крейт async-channel. Существуют также синхронные каналы, например, std::sync::mpsc и crossbeam::channel. Эти каналы ждут сообщений, блокируя поток, что не разрешено в асинхронном коде.
В этом разделе мы будем использовать mpsc
и oneshot
. Другие типы каналов передачи сообщений рассматриваются в следующих разделах. Полный код из этого раздела можно найти здесь.
Определение типа сообщения
В большинстве случаев при использовании передачи сообщений задача, получающая сообщения, отвечает более чем на одну команду. В нашем случае задача будет реагировать на команды GET
и SET
. Чтобы смоделировать это, мы определим перечисление Command
и включим в него вариант для каждого типа команды:
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
Создание канала
Создаем канал mpsc
в функции main
:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Создаем новый канал емкостью 32
let (tx, mut rx) = mpsc::channel(32);
// ...
}
Канал mpsc
используется для отправки команд задаче, управляющей соединением Redis. Возможность работы с несколькими производителями позволяет отправлять сообщения из многих задач. Создание канала возвращает два значения: отправителя и получателя. Они используются отдельно. Их можно перемещать в другие задачи.
Канал создан с емкостью (пропускной способностью) 32. Если сообщения отправляются быстрее, чем принимаются, канал сохраняет их. Как только 32 сообщения будут сохранены в канале, вызов send(...).await
перейдет в режим сна до тех пор, пока сообщение не будет удалено получателем.
Отправка из нескольких задач осуществляется путем клонирования Sender
. Например:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("Sending from first handle").await.unwrap();
});
tokio::spawn(async move {
tx2.send("Sending from second handle").await.unwrap();
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
Оба сообщения отправляются одному дескриптору Receiver
. Невозможно клонировать получателя канала mpsc
.
Когда все Sender
вышли за пределы области видимости или были удалены другим способом, отправлять в канал сообщения больше нельзя. На этом этапе вызов recv
на Receiver
возвращает None
, что означает, что все отправители уничтожены, и канал закрыт.
В нашем случае задача, которая управляет соединением Redis, знает, что может закрыть соединение после закрытия канала, поскольку оно больше не будет использоваться.
Создание управляющей задачи
Создаем задачу, которая обрабатывает сообщения из канала. Сначала клиент устанавливает соединение с Redis. Затем через него передаются полученные команды.
use mini_redis::client;
// Ключевое слово `move` используется для перемещения владения `rx` в задачу
let manager = tokio::spawn(async move {
// Подключаемся к серверу
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Начинаем получать сообщения
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});
Обновляем обе задачи, чтобы они отправляли команды через канал, а не обрабатывали их непосредственно:
// Дескрипторы `Sender` перемещаются в задачи. Поскольку у нас две задачи,
// нам нужен второй `Sender`
let tx2 = tx.clone();
// Создаем две задачи
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "foo".to_string(),
};
tx.send(cmd).await.unwrap();
});
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
В конце функции main
мы ожидаем дескрипторы соединения, чтобы гарантировать полное выполнение команд перед завершением процесса:
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
Получение ответов
Последний шаг — получить ответ от управляющей задачи. Команде GET
необходимо получить значение, а команде SET
необходимо знать, успешно ли завершилась операция установки значения.
Для передачи ответа используется канал oneshot
. oneshot
— это канал с одним производителем и одним потребителем, оптимизированный для отправки одного значения. В нашем случае единственным значением является ответ.
Подобно mpsc
, метод oneshot::channel
возвращает дескрипторы отправителя и получателя:
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
В отличие от mpsc
, емкость (capacity) канала не указывается, поскольку она всегда равна единице. Кроме того, никакой дескриптор не может быть клонирован.
Для получения ответов от управляющей задачи перед отправкой команды создается канал oneshot
. Половина канала Sender
включается в команду управляющей задачи. Другая часть канала используется для получения ответа.
Обновляем Command
, включая в нее Sender
. Для ссылки на Sender
используется псевдоним типа.
use tokio::sync::oneshot;
use bytes::Bytes;
/// Несколько разных команд мультиплексируются в одном канале
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// Предоставляется вызывающей стороной и используется управляющей задачей для отправки
/// ответа на команду вызывающей стороне
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
Обновляем задачи, отправляющие команды, включая в них oneshot::Sender
:
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "foo".to_string(),
resp: resp_tx,
};
// Отправляем команду `GET`
tx.send(cmd).await.unwrap();
// Ждем ответ
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// Отправляем команду `SET`
tx2.send(cmd).await.unwrap();
// Ждем ответ
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
Наконец, обновляем управляющую задачу для отправки ответа по каналу oneshot
:
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Игнорируем ошибки
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// Игнорируем ошибки
let _ = resp.send(res);
}
}
}
Вызов send
на oneshot::Sender
завершается немедленно и не требует .await
. Это связано с тем, что отправка по каналу oneshot
всегда будет неудачной или успешной немедленно, без какого-либо ожидания.
Отправка значения по каналу oneshot
возвращает Err
, когда получатель уничтожен (dropped). Это указывает на то, что получатель больше не заинтересован в ответе. В нашем сценарии это является приемлемым событием. Err
, возвращаемый методом resp.send
, не требует обработки.
Полный код примера можно найти здесь.
Обратное давление и ограниченные каналы
Всякий раз, когда вводится одновременное выполнение или очереди задач, важно убедиться, что очередь ограничена и система справляется с нагрузкой. Неограниченные очереди могут потребить всю доступную память и привести к сбою системы.
Tokio старается избегать неявных очередей. Во многом это связано с тем, что асинхронные операции являются ленивыми (lazy). Например:
loop {
async_op();
}
Если асинхронная операция будет выполняться незамедлительно (eagerly), цикл будет помещать в очередь новую операцию async_op
, не дожидаясь завершения предыдущей операции. Это может привести к неявной неограниченной очереди. Системы, основанные на колбеках, и системы, основанные на незамедлительных фьючерах, особенно восприимчивы к этому.
Однако в Tokio и асинхронном Rust async_op
вообще не запустится. Это связано с тем, что .await
никогда не вызывается. Если добавить в сниппет .await
, цикл будет ждать завершения текущей операции перед запуском новой.
loop {
// Следующая итерация наступит только после завершения текущей `async_op`
async_op().await;
}
Одновременное выполнение и очереди должны вводиться явно, например, с помощью:
tokio::spawn
select!
join!
mpsc::channel
При этом, необходимо позаботиться о том, чтобы общее количество одновременно выполняемых задач было ограничено. Например, при создании цикла принятия TCP-соединений нужно убедиться, что общее количество открытых сокетов ограничено. При использовании mpsc::channel
, важно выбрать управляемую пропускную способность канала. Конкретные ограничения зависят от потребностей приложения.
Ввод-вывод
Ввод-вывод в Tokio работает почти так же, как и в std
, но асинхронно. Существует типаж для чтения (AsyncRead) и типаж для записи (AsyncWrite). Определенные типы реализуют эти типажи соответствующим образом (TcpStream, File, Stdout). AsyncRead
и AsyncWrite
также реализуются рядом структур данных, таких как Vec<u8>
и &[u8]
. Это позволяет использовать массивы байтов там, где ожидается читатель или писатель.
В этом разделе будут рассмотрены базовые операции чтения и записи с помощью Tokio, а также приведено несколько примеров. В следующем разделе будет рассмотрен более продвинутый пример обработки ввода-вывода.
AsyncRead
и AsyncWrite
Эти типажи предоставляют возможности асинхронного чтения и записи в потоки байтов. Методы этих типажей обычно не вызываются напрямую, подобно тому, как мы не вызываем вручную метод call
типажа Future
. Вместо этого, они используются через вспомогательные методы, предоставляемые AsyncReadExt и AsyncWriteExt.
Кратко рассмотрим некоторые из этих методов. Все эти функции являются асинхронными и должны использоваться с .await
.
async fn read()
AsyncReadExt::read предоставляет асинхронный метод для чтения данных в буфер, возвращающий количество прочитанных байтов.
Если read()
возвращает Ok(0)
, это означает одно из двух:
- Читатель достиг EOF и, вероятно, больше не сможет производить байты. Обратите внимание: это не означает, что читатель никогда больше не сможет производить байты.
- Длина указанного буфера составляет 0 байт.
Дальнейшие вызовы read()
будут немедленно возвращать Ok(0)
. Для экземпляров TcpStream это означает, что половина сокета для чтения закрыта.
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("test.txt").await?;
let mut buffer = [0; 10];
// Читаем от 0 до 10 байтов
f.read(&mut buffer[..]).await?;
// Выводим в терминал первые 10 байтов
println!("The bytes: {:?}", buffer);
Ok(())
}
async fn read_to_end()
AsyncReadExt::read_to_end считывает все байты из потока до EOF:
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("test.txt").await?;
let mut buffer = Vec::new();
// Читаем весь файл
f.read_to_end(&mut buffer).await?;
// Выводим в терминал содержимое файла
println!("{}", String::from_utf8_lossy(&buffer));
Ok(())
}
async fn write()
AsyncWriteExt::write записывает буфер в файл, возвращая количество записанных байтов:
use tokio::fs::File;
use tokio::io::{self, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("test.txt").await?;
// Записываем часть байтовой строки в файл
let n = file.write(b"some bytes").await?;
// Выводим в терминал количество записанных байтов
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
__async fn write_all()
__
AsyncWriteExt::write_all записывает весь буфер в файл:
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
// Создаем или открываем файл для записи
let mut file = File::create("test.txt").await?;
// Записываем байтовую строку в файл
file.write_all(b"some bytes").await?;
// Открываем файл для чтения
file = File::open("test.txt").await?;
let mut buffer = Vec::new();
// Читаем байты из файла
file.read_to_end(&mut buffer).await?;
// Выводим в терминал содержимое файла
println!("{}", String::from_utf8_lossy(&buffer));
Ok(())
}
Вспомогательные функции
Как и std
, модуль tokio::io содержит ряд полезных утилит, а также API для работы со стандартным вводом, стандартным выводом и стандартными ошибками. Например, tokio::io::copy асинхронно копирует все содержимое устройства чтения в устройство записи:
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("test.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
Обратите внимание, здесь используется то, что байтовые массивы также реализуют AsyncRead
.
Эхо-сервер
Поупражняемся в работе с асинхронным вводом-выводом. Напишем эхо-сервер.
Эхо-сервер "привязывает" TcpListener
и принимает входящие соединения в цикле. Для каждого входящего соединения данные считываются из сокета и немедленно записываются обратно в него. Клиент отправляет данные на сервер и получает их обратно.
Мы реализуем эхо-сервер дважды с помощью разных стратегий.
io::copy()
Начнем с реализации сервера с помощью утилиты io::copy.
Создаем новый двоичный файл:
touch src/bin/echo-server-copy.rs
Команда для запуска примера:
cargo run --bin echo-server-copy
Тестировать сервер можно, используя стандартный инструмент командной строки, такой как telnet
, или написав простой клиент, подобный тому, который можно найти в документации tokio::net::TcpStream.
Это TCP-сервер, и ему нужен цикл принятия. Для обработки каждого входящего сокета создается новая задача.
use tokio::io;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Копируем данные
});
}
}
Эта утилита берет устройства чтения и записи и копирует данные из одного в другой. Однако у нас есть только один TcpStream
. Он реализует как AsyncRead
, так и AsyncWrite
. Поскольку io::copy
требует &mut
как для чтения, так и для записи, сокет нельзя использовать для обоих аргументов.
// Это не будет компилироваться
io::copy(&mut socket, &mut socket).await
Разделение читателя и писателя
Для решения этой проблемы, мы должны разделить сокет на дескриптор чтения и дескриптор записи. Лучший способ это сделать зависит от конкретного типа.
Любой тип читатель + писатель
можно разделить с помощью утилиты io::split. Эта функция принимает одно значение и возвращает отдельные дескрипторы чтения и записи. Эти два дескриптора можно использовать независимо, в том числе, в отдельных задачах.
Например, эхо-клиент может обрабатывать одновременные операции чтения и записи следующим образом:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// Записываем данные в фоновом режиме
tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// Иногда компилятору Rust нужна небольшая помощь
// для вывода правильного типа
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
Поскольку io::split()
поддерживает любое значение, реализующее AsyncRead
+ AsyncWrite
, и возвращает независимые дескрипторы, внутри io::split()
используются Arc
и Mutex
. Этих накладных расходов можно избежать с помощью TcpStream
, который предоставляет две функции разделения.
TcpStream::split принимает ссылку на поток и возвращает дескрипторы чтения и записи. Поскольку используется ссылка, оба дескриптора должны оставаться в той же задаче, из которой вызывается split()
. Эта функция является бесплатной. Arc
или Mutex
ей не нужны. TcpStream
также предоставляет функцию into_split, возвращающую дескрипторы, которые могут перемещаться между задачами за счет только Arc
.
Поскольку io::copy()
вызывается для задачи, которая владеет TcpStream
, мы можем использовать TcpStream::split()
. Задача, отвечающая за логику на сервере, будет выглядеть так:
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("Failed to copy");
}
});
Полный код примера можно найти здесь.
Ручное копирование
Теперь посмотрим на эхо-сервер, копирующий данные вручную. Для этого мы будем использовать AsyncReadExt::read и AsyncWriteExt::write_all.
Полный код сервера:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// `Ok(0)` свидетельствует о закрытии сокета
Ok(0) => return,
Ok(n) => {
// Копируем данные обратно в сокет
if socket.write_all(&buf[..n]).await.is_err() {
// Неожиданная ошибка сокета. Мы ничего не можем с ней сделать,
// так что просто прекращаем обработку
return;
}
}
Err(_) => {
// Неожиданная ошибка сокета
return;
}
}
}
});
}
}
Этот код можно поместить в src/bin/echo-server.rs
и запустить с помощью cargo run --bin echo-server
.
Разберем код построчно. Во-первых, поскольку используются утилиты AsyncRead
и AsyncWrite
, в область видимости должны быть включены расширяющие их типажи:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
Выделение буфера
Стратегия состоит в том, чтобы прочитать данные из сокета в буфер, а затем записать содержимое буфера обратно в сокет:
let mut buf = vec![0; 1024];
Мы специально не используем стековый буфер. Ранее мы отмечали, что все данные задачи, которые сохраняются при вызовах .await
, должны храниться в задаче. В этом случае buf
используется при вызовах .await
. Все данные задачи хранятся в одном месте. Об этом можно думать как о enum
, где каждый вариант — это данные, которые необходимо сохранить для конкретного вызова .await
.
Если буфер представлен массивом стека, внутренняя структура задач, создаваемых для каждого принятого сокета, может выглядеть примерно так:
struct Task {
// Внутренние поля задачи
task: enum {
AwaitingRead {
socket: TcpStream,
buf: [BufferType],
},
AwaitingWriteAll {
socket: TcpStream,
buf: [BufferType],
}
}
}
Если в качестве типа буфера используется стековый массив, он будет храниться внутри структуры задачи. Это сделает структуру задачи очень большой. Кроме того, размеры буфера часто соответствуют размеру страницы. Это, в свою очередь, приведет к неуклюжему размеру Task
: $page-size + несколько-байт
.
На самом деле компилятор использует более оптимальное представление состояния асинхронного блока, чем простой enum
. На практике переменные не перемещаются между вариантами, как это требуется при перечислении. Однако размер структуры задачи по крайней мере равен размеру самой большой переменной.
По этой причине обычно более эффективно использовать отдельное пространство для буфера.
Обработка EOF
Когда читатель TCPStream
закрывается, вызов read()
возвращает Ok(0)
. На этом этапе важно выйти из цикла чтения. Забывание об этом является распространенным источником ошибок.
loop {
match socket.read(&mut buf).await {
Ok(0) => return,
// ...
}
}
Полный код примера можно найти здесь.
Это конец первой части туториала.
Happy coding!
Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud — в нашем Telegram-канале ↩
fedorro
Столько раз упоминается слово "одновременно", но асинхронно - это не "одновременно", а "по очереди", но нескольких задач одним потоком. Т.е. есть 3 задачи, и один поток, который берет первую задачу, пока она не упрется в IO. Если не упрется в IO, выполнит первую - возьмется за вторую, а если первая встанет на IO, то пока она делает свой IO - поток сходит повыполнять вторую.
Так что асинхронный код позволяет выполнять большее количество задач меньшим количеством потоков за счет того что потоки не спят вместе с задачами в ожидании IO - вот верная формулировка, ну и тд по тексту.
blind_oracle
Строго говоря не в I/O упрётся, а в .await на future, который может быть чем угодно. Тем же tokio::time::sleep например.
Тот же дисковый I/O обычно как раз блокирующий в Расте, просто Токио выполняет его в отдельном пуле потоков (spawn_blocking)