Hello world!
Представляю вашему вниманию бонусную часть практического руководства по Rust.
Другой формат, который может показаться вам более удобным.
Руководство основано на Comprehensive Rust — руководстве по Rust
от команды Android
в Google
и рассчитано на людей, которые уверенно владеют любым современным языком программирования. Еще раз: это руководство не рассчитано на тех, кто только начинает кодить ?
В этой части мы поговорим о параллельном (concurrent) и асинхронном (async) Rust
.
Материалы для более глубокого изучения названных тем:
- Книга/учебник по Rust (на русском языке) — главы 16 и 20
- rustlings — упражнение 20
- Rust на примерах (на русском языке) — примеры 20.1 и 20.2
Также см. Большую шпаргалку по Rust.
Параллельный Rust
Rust
полностью поддерживает параллелизм (concurrency) с использованием потоков ОС с мьютексами (mutexes) и каналами (channels).
Система типов Rust
играет важную роль в том, что многие ошибки параллелизма становятся ошибками времени компиляции. Это часто называют бесстрашным параллелизмом (fearless concurrency), поскольку мы можем положиться на компилятор, который обеспечивает правильную обработку параллелизма во время выполнения.
Потоки
Потоки (threads) Rust
работают аналогично потокам в других языках:
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("значение счетчика в выделенном потоке: {i}!");
thread::sleep(Duration::from_millis(5));
}
});
for i in 1..5 {
println!("значение счетчика в основном потоке: {i}");
thread::sleep(Duration::from_millis(5));
}
}
- потоки являются потоками демона (daemon threads), основной поток не ждет их выполнения
- потоки паникуют независимо друг от друга
- паника может содержать полезную нагрузку (payload), которую можно извлечь с помощью
downcast_ref
- паника может содержать полезную нагрузку (payload), которую можно извлечь с помощью
Ремарки:
- обратите внимание, что основной поток не ждет выполнения выделенных (spawned) потоков
- для ожидания выполнения потока следует использовать
let handle = thread::spawn()
и затемhandle.join()
- вызовите панику в потоке. Обратите внимание, что это не влияет на
main
- используйте
Result
, возвращаемый изhandle.join()
, для доступа к полезной нагрузке паники. В этом может помочь Any
Потоки с ограниченной областью видимости
Обычные потоки не могут заимствовать значения из окружения:
use std::thread;
fn foo() {
let s = String::from("привет");
thread::spawn(|| {
println!("длина: {}", s.len());
});
}
fn main() {
foo();
}
Однако для этого можно использовать scoped threads (потоки с ограниченной областью видимости):
use std::thread;
fn main() {
let s = String::from("привет");
thread::scope(|scope| {
scope.spawn(|| {
println!("длина: {}", s.len());
});
});
}
Ремарки:
- когда функция
thread::scope
завершается, все потоки гарантированно объединяются, поэтому они могут вернуть заимствованные данные - применяются обычные правила заимствования
Rust
: мы можем заимствовать значение мутабельно в одном потоке, или иммутабельно в любом количестве потоков
Каналы
Каналы (channels) Rust
состоят из двух частей: Sender<T>
(отправитель/передатчик) и Receiver<T>
(получатель/приемник). Они соединяются с помощью канала, но мы видим только конечные точки:
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
tx.send(10).unwrap();
tx.send(20).unwrap();
println!("Получено: {:?}", rx.recv());
println!("Получено: {:?}", rx.recv());
let tx2 = tx.clone();
tx2.send(30).unwrap();
println!("Получено: {:?}", rx.recv());
}
-
mpsc
означает Multi-Producer, Single-Consumer (несколько производителей, один потребитель).Sender
иSyncSender
реализуютClone
(поэтому мы можем создать несколько производителей), аReceiver
не реализует (поэтому у нас может быть только один потребитель) -
send()
иrecv()
возвращаютResult
. Если они возвращаютErr
, значит соответствующийSender
илиReceiver
уничтожен (dropped) и канал закрыт
Несвязанные каналы
mpsc::channel()
возвращает несвязанный (unbounded) и асинхронный канал:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 1..10 {
tx.send(format!("Сообщение {i}")).unwrap();
println!("{thread_id:?}: отправил сообщение {i}");
}
println!("{thread_id:?}: готово");
});
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Основной поток: получено {msg}");
}
}
Связанные каналы
send()
связанного (bounded) синхронного канала блокирует текущий поток:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::sync_channel(3);
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 1..10 {
tx.send(format!("Сообщение {i}")).unwrap();
println!("{thread_id:?}: отправил сообщение {i}");
}
println!("{thread_id:?}: готово");
});
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Основной поток: получено {msg}");
}
}
Ремарки:
- вызов
send()
блокирует текущий поток до тех пор, пока в канале имеется место для новых сообщений. Поток может блокироваться бесконечно, если отсутствует получатель - вызов
send()
заканчивается ошибкой (поэтому возвращаетсяResult
), если канал закрыт. Канал закрывается после уничтожения получателя - связанный канал с нулевым размером называется "rendezvous channel". Каждый вызов
send()
блокирует текущий поток, пока другой поток не вызоветread()
Send и Sync
Откуда Rust
знает о необходимости запрета доступа к общему (shared) состоянию из нескольких потоков? Ответ кроется в двух трейтах:
-
Send: тип
T
являетсяSend
, если передачаT
в другой поток является безопасной -
Sync: тип
T
являетсяSync
, если передача&T
в другой поток является безопасной
Send
и Sync
являются небезопасными трейтами. Компилятор автоматически реализует их для наших типов при условии, что они содержат только типы Send
и Sync
. Эти типы можно реализовать самостоятельно, если мы уверены в валидности наших типов.
Ремарки:
- о типах
Send
иSync
можно думать как о маркерах того, что тип содержит несколько безопасных с точки зрения потоков (thread-safety) свойств - эти типы могут использоваться в качестве общих ограничений по аналогии с обычными трейтами
Send
Тип T
является Send
, если передача значения T
в другой поток является безопасной.
Эффект перемещения владения в другой поток заключается в том, что деструкторы T
запускаются в этом новом потоке. Поэтому вопрос состоит в том, когда мы можем выделить значение в одном потоке и освободить его в другом потоке.
Например, подключение к SQLite
доступно только в одном потоке.
Sync
Тип T
является Sync
, если одновременный доступ к значению T
из нескольких потоков является безопасным.
Если быть более точным, определение гласит следующее: T
является Sync
, если и только если &T
является Send
.
Это означает, что если тип является "потокобезопасным" (thread-safe) для совместного использования, он также потокобезопасен для передачи ссылок между потоками.
Если тип является Sync
, его можно использовать в нескольких потоках без риска возникновения гонок за данными (data race) и других проблем с синхронизацией, поэтому его можно безопасно перемещать в другой поток. Ссылка на тип также может безопасно перемещаться в другой поток, поскольку доступ к данным, на которые она ссылается, из любого потока является безопасным.
Примеры
Send + Sync
Большинство типов является Send + Sync
:
-
i8
,f32
,bool
,char
,&str
etc. -
(T1, T2)
,[T; N]
,&[T]
,struct { x: T }
etc. -
String
,Option<T>
,Vec<T>
,Box<T>
etc. -
Arc<T>
— явно потокобезопасный благодаря атомарному подсчету ссылок -
Mutex<T>
— явно потокобезопасный благодаря внутренней блокировке -
AtomicBool
,AtomicU8
etc., где используются специальные атомарные инструкции
Дженерики обычно являются Send + Sync
, когда таковыми являются параметры типов.
Send + !Sync
Эти типы могут перемещаться в другие потоки, но не являются потокобезопасными. Обычно это связано с их внутренней изменчивостью:
mpsc::Sender<T>
mpsc::Receiver<T>
Cell<T>
RefCell<T>
!Send + Sync
Эти типы являются потокобезопасными, но не могут перемещаться в другие потоки:
-
MutexGuard<T: Sync>
— использует примитивы уровня ОС, которые должны быть освобождены в создавшем их потоке
!Send + !Sync
Эти типы не являются потокобезопасными и не могут перемещаться в другой поток:
-
Rc<T>
— каждыйRc<T>
содержит ссылку наRcBox<T>
, который содержит неатомарный счетчик ссылок -
*const T
,*mut T
—Rust
предполагает, что сырые указатели могут иметь нюансы, связанные с их параллельным выполнением
Общее состояние
Rust
использует систему типов для обеспечения синхронизации общих (shared) данных. Это делается в основном с помощью двух типов:
-
Arc — атомарный счетчик ссылок на T
: обрабатывает передачу между потоками и освобождаетT
при уничтожении последней ссылки на нее
Mutex — обеспечивает взаимоисключающий доступ к значению T
Arc
Arc<T>
предоставляет общий доступ только для чтения кT
черезArc::clone()
:
use std::sync::Arc; use std::thread; fn main() { let v = Arc::new(vec![10, 20, 30]); let mut handles = Vec::new(); for _ in 1..5 { let v = Arc::clone(&v); handles.push(thread::spawn(move || { let thread_id = thread::current().id(); println!("{thread_id:?}: {v:?}"); })); } handles.into_iter().for_each(|h| h.join().unwrap()); println!("v: {v:?}"); }
Ремарки:
-
Arc
означает Atomic Reference Counter (атомарный счетчик ссылок) и является потокобезопасной версиейRc
, в которой используются атомарные операции -
Arc<T>
реализуетClone
независимо от того, делает ли этоT
. Он реализуетSend
иSync
, только еслиT
их реализует -
Arc::clone()
имеет некоторую цену за счет выполнения атомарных операций, но после этого использованиеT
является бесплатным - остерегайтесь ссылочных циклов,
Arc
не использует сборщик мусора для их обнаружения
- в этом может помочь
std::sync::Weak
- в этом может помочь
Mutex
Mutex<T>
обеспечивает взаимное исключение и предоставляет мутабельный доступ кT
через доступный только для чтения интерфейс (форма внутренней изменчивости):
use std::sync::Mutex; fn main() { let v = Mutex::new(vec![10, 20, 30]); println!("v: {:?}", v.lock().unwrap()); { let mut guard = v.lock().unwrap(); guard.push(40); } println!("v: {:?}", v.lock().unwrap()); }
Обратите внимание на неявную реализацию (
impl<T: Send> Sync for Mutex<T>
)https://doc.rust-lang.org/std/sync/struct.Mutex.html#impl-Sync-for-Mutex%3CT%3E.
Ремарки:
-
Mutex
вRust
похож на коллекцию, состоящую из одного элемента — защищенных данных
- невозможно забыть получить (acquire) мьютекс перед доступом к защищенным данным
- из
&Mutex<T>
через блокировку (lock) можно получить&mut T
.MutexGuard
гарантирует, что&mut T
не живет дольше удерживаемой (held) блокировки -
Mutex<T>
реализуетSend
иSync
, только еслиT
реализуетSend
-
RwLock
является блокировкой, доступной как для чтения, так и для записи - почему
lock()
возвращаетResult
?
- Если поток, в котором находится мьютекс, паникует, мьютекс становится "отравленным" (poisoned), сигнализируя о том, что защищенные данные могут находиться в несогласованном состоянии. Вызов
lock()
на отравленном мьютексе проваливается с PoisonError. Для восстановления данных можно вызватьinto_iter()
на ошибке
- Если поток, в котором находится мьютекс, паникует, мьютекс становится "отравленным" (poisoned), сигнализируя о том, что защищенные данные могут находиться в несогласованном состоянии. Вызов
Пример
Рассмотрим
Arc
иMutex
в действии:
use std::thread; // use std::sync::{Arc, Mutex}; fn main() { let v = vec![10, 20, 30]; let handle = thread::spawn(|| { v.push(10); }); v.push(1000); handle.join().unwrap(); println!("v: {v:?}"); }
Возможное решение:
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let v = Arc::new(Mutex::new(vec![10, 20, 30])); let v2 = Arc::clone(&v); let handle = thread::spawn(move || { let mut v2 = v2.lock().unwrap(); v2.push(10); }); { let mut v = v.lock().unwrap(); v.push(1000); } handle.join().unwrap(); println!("v: {v:?}"); }
Ремарки:
-
v
обернут вArc
иMutex
, поскольку их зоны ответственности ортогональны
- оборачивание
Mutex
вArc
является распространенным паттерном для передачи мутабельного состояния между потоками
- оборачивание
-
v: Arc<_>
должен быть клонирован какv2
для передачи в другой поток. Обратите внимание наmove
в сигнатуре лямбды - блоки предназначены для максимального сужения области видимости
LockGuard
Упражнения
Попрактикуемся применять новые знания на двух упражнениях:
- обедающие философы — классическая задача параллелизма
- многопоточная проверка ссылок
Обедающие философы
Условия задачи:
Пять безмолвных философов сидят вокруг круглого стола, перед каждым философом стоит тарелка спагетти. На столе между каждой парой ближайших философов лежит по одной вилке.
Каждый философ может либо есть, либо размышлять. Прием пищи не ограничен количеством оставшихся спагетти — подразумевается бесконечный запас. Тем не менее, философ может есть только тогда, когда держит две вилки — взятую справа и слева.
Каждый философ может взять ближайшую вилку (если она доступна) или положить — если он уже держит ее. Взятие каждой вилки и возвращение ее на стол являются раздельными действиями, которые должны выполняться одно за другим.
Задача заключается в том, чтобы разработать модель (параллельный алгоритм), при которой ни один из философов не будет голодать, то есть будет чередовать прием пищи и размышления 100 раз.
use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; struct Fork; struct Philosopher { name: String, // left_fork: ... // right_fork: ... // thoughts: ... } impl Philosopher { fn think(&self) { self.thoughts .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name)) .unwrap(); } fn eat(&self) { // Берем вилки println!("{} ест...", &self.name); thread::sleep(Duration::from_millis(10)); } } static PHILOSOPHERS: &[&str] = &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"]; fn main() { // Создаем вилки // Создаем философов // Каждый философ размышляет и ест 100 раз // Выводим размышления философов }
Подсказка: рассмотрите возможность использования std::mem::swap для решения проблемы взаимной блокировки (deadlock).
Решениеuse std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; struct Fork; struct Philosopher { name: String, left_fork: Arc<Mutex<Fork>>, right_fork: Arc<Mutex<Fork>>, thoughts: mpsc::SyncSender<String>, } impl Philosopher { fn think(&self) { self.thoughts .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name)) .unwrap(); } fn eat(&self) { println!("{} пытается есть", &self.name); let _left = self.left_fork.lock().unwrap(); let _right = self.right_fork.lock().unwrap(); println!("{} ест...", &self.name); thread::sleep(Duration::from_millis(10)); } } static PHILOSOPHERS: &[&str] = &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"]; fn main() { let (tx, rx) = mpsc::sync_channel(10); let forks = (0..PHILOSOPHERS.len()) .map(|_| Arc::new(Mutex::new(Fork))) .collect::<Vec<_>>(); for i in 0..forks.len() { let tx = tx.clone(); let mut left_fork = Arc::clone(&forks[i]); let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]); // Во избежание взаимной блокировки нам необходимо где-то нарушить симметрию. // Меняем вилки местами без их повторной инициализации if i == forks.len() - 1 { std::mem::swap(&mut left_fork, &mut right_fork); } let philosopher = Philosopher { name: PHILOSOPHERS[i].to_string(), thoughts: tx, left_fork, right_fork, }; thread::spawn(move || { for _ in 0..100 { philosopher.eat(); philosopher.think(); } }); } drop(tx); for thought in rx { println!("{thought}"); } }
Многопоточная проверка ссылок
Создадим инструмент для многопоточной проверки ссылок. Он должен начинать с основной веб-страницы и проверять корректность ссылок на ней. Затем он должен рекурсивно проверять другие страницы в том же домене и продолжать делать это до тех пор, пока все страницы не будут проверены.
Для создания такого инструмента вам потребуется какой-нибудь клиент
HTTP
, например, reqwest:
cargo add reqwest --features blocking,rustls-tls
Для обнаружения ссылок можно воспользоваться scraper:
cargo add scraper
Наконец, для обработки ошибок пригодится thiserror:
cargo add thiserror
Cargo.toml
:
[package] name = "link-checker" version = "0.1.0" edition = "2021" publish = false [dependencies] reqwest = { version = "0.11.12", features = ["blocking", "rustls-tls"] } scraper = "0.13.0" thiserror = "1.0.37"
Начните с небольшого сайта, такого как
https://www.google.org
.
src/main.rs
:
use reqwest::blocking::Client; use reqwest::Url; use scraper::{Html, Selector}; use thiserror::Error; #[derive(Error, Debug)] enum Error { #[error("ошибка запроса: {0}")] ReqwestError(#[from] reqwest::Error), #[error("плохой ответ HTTP: {0}")] BadResponse(String), } #[derive(Debug)] struct CrawlCommand { url: Url, extract_links: bool, } fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> { println!("проверка {:#}", command.url); let response = client.get(command.url.clone()).send()?; if !response.status().is_success() { return Err(Error::BadResponse(response.status().to_string())); } let mut link_urls = Vec::new(); if !command.extract_links { return Ok(link_urls); } let base_url = response.url().to_owned(); let body_text = response.text()?; let document = Html::parse_document(&body_text); let selector = Selector::parse("a").unwrap(); let href_values = document .select(&selector) .filter_map(|element| element.value().attr("href")); for href in href_values { match base_url.join(href) { Ok(link_url) => { link_urls.push(link_url); } Err(err) => { println!("в {base_url:#} не поддается разбору {href:?}: {err}"); } } } Ok(link_urls) } fn main() { let client = Client::new(); let start_url = Url::parse("https://www.google.org").unwrap(); let crawl_command = CrawlCommand{ url: start_url, extract_links: true }; match visit_page(&client, &crawl_command) { Ok(links) => println!("ссылки: {links:#?}"), Err(err) => println!("невозможно извлечь ссылки: {err:#}"), } }
Задачи:
- используйте потоки для параллельной проверки ссылок: отправьте проверяемые URL-адреса в канал и позвольте нескольким потокам проверять URL-адреса параллельно
- реализуйте рекурсивное извлечение ссылок со всех страниц домена
www.google.org
. Установите верхний предел в 100 страниц или около того, чтобы сайт вас не заблокировал
Решениеuse std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::collections::HashSet; use reqwest::blocking::Client; use reqwest::Url; use scraper::{Html, Selector}; use thiserror::Error; #[derive(Error, Debug)] enum Error { #[error("ошибка запроса: {0}")] ReqwestError(#[from] reqwest::Error), #[error("плохой ответ HTTP: {0}")] BadResponse(String), } #[derive(Debug)] struct CrawlCommand { url: Url, extract_links: bool, } fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> { println!("проверка {:#}", command.url); let response = client.get(command.url.clone()).send()?; if !response.status().is_success() { return Err(Error::BadResponse(response.status().to_string())); } let mut link_urls = Vec::new(); if !command.extract_links { return Ok(link_urls); } let base_url = response.url().to_owned(); let body_text = response.text()?; let document = Html::parse_document(&body_text); let selector = Selector::parse("a").unwrap(); let href_values = document .select(&selector) .filter_map(|element| element.value().attr("href")); for href in href_values { match base_url.join(href) { Ok(link_url) => { link_urls.push(link_url); } Err(err) => { println!("в {base_url:#} не поддается разбору {href:?}: {err}"); } } } Ok(link_urls) } struct CrawlState { domain: String, visited_pages: HashSet<String>, } impl CrawlState { fn new(start_url: &Url) -> CrawlState { let mut visited_pages = HashSet::new(); visited_pages.insert(start_url.as_str().to_string()); CrawlState { domain: start_url.domain().unwrap().to_string(), visited_pages } } /// Определяет, должны ли извлекаться ссылки на указанной странице fn should_extract_links(&self, url: &Url) -> bool { let Some(url_domain) = url.domain() else { return false; }; url_domain == self.domain } /// Помечает указанную страницу как посещенную, /// возвращает `false`, если страница уже посещалась fn mark_visited(&mut self, url: &Url) -> bool { self.visited_pages.insert(url.as_str().to_string()) } } type CrawlResult = Result<Vec<Url>, (Url, Error)>; fn spawn_crawler_threads( command_receiver: mpsc::Receiver<CrawlCommand>, result_sender: mpsc::Sender<CrawlResult>, thread_count: u32, ) { let command_receiver = Arc::new(Mutex::new(command_receiver)); for _ in 0..thread_count { let result_sender = result_sender.clone(); let command_receiver = command_receiver.clone(); thread::spawn(move || { let client = Client::new(); loop { let command_result = { let receiver_guard = command_receiver.lock().unwrap(); receiver_guard.recv() }; let Ok(crawl_command) = command_result else { // Отправитель уничтожен, команд больше не будет break; }; let crawl_result = match visit_page(&client, &crawl_command) { Ok(link_urls) => Ok(link_urls), Err(error) => Err((crawl_command.url, error)), }; result_sender.send(crawl_result).unwrap(); } }); } } fn control_crawl( start_url: Url, command_sender: mpsc::Sender<CrawlCommand>, result_receiver: mpsc::Receiver<CrawlResult>, ) -> Vec<Url> { let mut crawl_state = CrawlState::new(&start_url); let start_command = CrawlCommand { url: start_url, extract_links: true }; command_sender.send(start_command).unwrap(); let mut pending_urls = 1; let mut bad_urls = Vec::new(); while pending_urls > 0 { let crawl_result = result_receiver.recv().unwrap(); pending_urls -= 1; match crawl_result { Ok(link_urls) => { for url in link_urls { if crawl_state.mark_visited(&url) { let extract_links = crawl_state.should_extract_links(&url); let crawl_command = CrawlCommand { url, extract_links }; command_sender.send(crawl_command).unwrap(); pending_urls += 1; } } } Err((url, error)) => { bad_urls.push(url); println!("при извлечении ссылок возникла ошибка: {:#}", error); continue; } } } bad_urls } fn check_links(start_url: Url) -> Vec<Url> { let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>(); let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>(); spawn_crawler_threads(command_receiver, result_sender, 16); control_crawl(start_url, command_sender, result_receiver) } fn main() { let start_url = reqwest::Url::parse("https://www.google.org").unwrap(); let bad_urls = check_links(start_url); println!("плохие URL: {:#?}", bad_urls); }
Асинхронный Rust
"Асинхронность" (async) — это модель параллелизма, в которой несколько задач выполняются одновременно. Каждая задача выполняется до тех пор, пока не завершится или не заблокируется, затем выполняется следующая (готовая к выполнению) задача и т.д. Такая модель позволяет выполнять большое количество задач с помощью небольшого числа потоков. Это связано с тем, что накладные расходы на выполнение каждой задачи обычно очень низкие, а операционные системы предоставляют примитивы для эффективного переключения между задачами.
Асинхронные операции
Rust
основаны на фьючерах (futures, от "future" — будущее), представляющих собой работу, которая может быть завершена в будущем. Фьючеры "опрашиваются" (polled) до тех пор, пока не сообщат о завершении.
Фьючеры опрашиваются асинхронной средой выполнения (async runtime). Доступно несколько таких сред. Одной из самых популярных является Tokio.
Сравнения:
- в
Python
используется похожая модель вasyncio
. Однако, его типFuture
основан на функциях обратного вызова (callbacks), а не на опросах. Асинхронные программыPython
должны выполняться в цикле, как и асинхронные программыRust
-
Promise
вJavaScript
похож на фьючер, но также основан на колбэках. Среды выполнения реализует цикл событий (event loop), поэтому многие детали разрешения промиса являются скрытыми
Основы асинхронности
async/await
На высоком уровне асинхронный код
Rust
выглядит очень похоже на обычный синхронный код:
use futures::executor::block_on; async fn count_to(count: i32) { for i in 1..=count { println!("Значение счетчика: {i}!"); } } async fn async_main(count: i32) { count_to(count).await; } fn main() { block_on(async_main(10)); }
Ремарки:
- это упрощенный пример для демонстрации синтаксиса. В нем отсутствуют долгие операции или параллелизм
- какой тип возвращает асинхронная функция?
- используйте
let future: () = async_main(10);
вmain()
и посмотрите
- используйте
- ключевое слово
async
— это синтаксический сахар. Компилятор заменяет возвращаемый тип фьючером - мы не можем сделать функцию
main
асинхронной без предоставления компилятору дополнительных инструкций о том, как использовать возвращаемый фьючер - для запуска асинхронного кода требуется исполнитель (executor).
block_on()
блокирует текущий поток до завершения фьючера -
.await
асинхронно ждет завершения другой операции. В отличие отblock_on()
,await
не блокирует текущий поток -
await
может использоваться только внутри асинхронной функции (или блока, о чем мы поговорим позже)
Фьючеры
Future — это трейт, реализуемый объектами, представляющими операцию, которая пока не может быть завершена. Фьючеры могут опрашиваться,
poll()
возвращает Poll.
use std::pin::Pin; use std::task::Context; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } pub enum Poll<T> { Ready(T), Pending, }
Асинхронная функция возвращает
impl Future
. Также возможно (но редко применяется) реализоватьFuture
для собственных типов. Например,JoinHandle
, возвращаемыйtokio::spawn
реализуетFuture
, что позволяет присоединяться (join) к нему.
Ключевое слово
await
приостанавливает выполнение асинхронной функции, пока фьючер не будет готов.
Ремарки:
- типы
Future
иPoll
реализованы в точности, как показано - мы не будем рассматривать
Pin
иContext
, поскольку не будет создавать новые асинхронные примитивы. Коротко:
-
Context
позволяет фьючеру планировать повторный опрос при возникновении события -
Pin
гарантирует, что фьючер не перемещается в памяти, поэтому ссылки в этом фьючере остаются валидными. Это необходимо, чтобы ссылки оставались валидными послеawait
-
Среда выполнения
Среда выполнения (runtime) предоставляет поддержку для асинхронного выполнения операций (reactor) и отвечает за выполнение фьючеров (executor).
Rust
не имеет "встроенной" среды выполнения, но доступно несколько вариантов:
-
Tokio — производительный, с хорошим набором инструментов, таких как Hyper для
HTTP
или Tonic дляgRPC
-
async-std — стремится быть "стандартной библиотекой для асинхронного кода" и включает стандартную среду выполнения в
async::task
- smol — простой и легковесный
Несколько больших приложений имеют собственные среды выполнения. Одним из таких приложений является Fuchsia.
Фьючеры являются "инертными" в том смысле, что они ничего не делают, пока не будут опрошены исполнителем. Это отличается от промисов
JS
, например, которые запускаются, даже если никогда не используются.
Tokio
Tokio
предоставляет:
- многопоточную среду выполнения для выполнения асинхронного кода
- асинхронную версию стандартной библиотеки
- большую экосистему библиотек
use tokio::time; async fn count_to(count: i32) { for i in 1..=count { println!("Значение счетчика в задаче: {i}!"); time::sleep(time::Duration::from_millis(5)).await; } } #[tokio::main] async fn main() { tokio::spawn(count_to(10)); for i in 1..5 { println!("Значение счетчика в основной задаче: {i}"); time::sleep(time::Duration::from_millis(5)).await; } }
Ремарки:
- макрос
tokio::main
позволяет делать функциюmain
асинхронной - функция
spawn
создает новую параллельную "задачу" - обратите внимание:
spawn
принимаетFuture
, мы не вызывает.await
наcount_to()
- почему
count_to()
обычно не доходит до 10? Это пример отмены асинхронной операции.tokio::spawn()
возвращает обработчик (handle), который может заставить поток ждать его завершения - попробуйте заменить
tokio::spawn
наcount_to(10).await
- попробуйте добавить ожидание завершения задачи, возвращаемой
tokio::spawn()
Задачи
Rust
имеет систему задач (task system), которая является формой легковесного трейдинга (threading).
Задача имеет один верхнеуровневый фьючер, который опрашивается исполнителем. Этот фьючер может иметь несколько вложенных фьючеров, которые он опрашивает методом
poll
, что приблизительно соответствует стеку вызовов (call stack). Параллелизм внутри задачи возможен путем опроса нескольких дочерних фьючеров, например, запуск таймера и выполнение операции ввода-вывода.
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:0").await?; println!("запросы принимаются на порту {}", listener.local_addr()?.port()); loop { let (mut socket, addr) = listener.accept().await?; println!("запрос из {addr:?}"); tokio::spawn(async move { socket.write_all(b"Кто ты?\n").await.expect("ошибка сокета"); let mut buf = vec![0; 1024]; let name_size = socket.read(&mut buf).await.expect("ошибка сокета"); let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim(); let reply = format!("Привет, {name}!\n"); socket.write_all(reply.as_bytes()).await.expect("ошибка сокета"); }); } }
- Перед нами
async
блок. Такие блоки похожи на замыкания, но не принимают параметров. Их возвращаемым значением является фьючер, как уasync fn
- переделайте асинхронный блок в функцию и улучшите обработку ошибок с помощью оператора
?
Асинхронные каналы
Некоторые крейты поддерживают асинхронные каналы, например,
tokio
:
use tokio::sync::mpsc::{self, Receiver}; async fn ping_handler(mut input: Receiver<()>) { let mut count: usize = 0; while let Some(_) = input.recv().await { count += 1; println!("получено {count} пингов"); } println!("ping_handler завершен"); } #[tokio::main] async fn main() { let (sender, receiver) = mpsc::channel(32); let ping_handler_task = tokio::spawn(ping_handler(receiver)); for i in 0..10 { sender.send(()).await.expect("провал отправки пинга"); println!("отправлено {} пингов", i + 1); } drop(sender); ping_handler_task.await.expect("что-то пошло не так"); }
- измените размер канала на
3
и посмотрите, как это повлияет на выполнение - интерфейс асинхронных каналов аналогичен интерфейсу
sync
каналов, о которых мы говорили ранее - попробуйте удалить
std::mem::drop
. Что произойдет? Почему? - крейт Flume предоставляет каналы, которые реализуют как
sync
, так иasync send
, иrecv
. Это может быть полезным для сложных приложений с задачами обработки ввода-вывода и тяжелыми для ЦП задачами -
async
каналы могут быть использованы вместе с другимиfuture
для создания сложного потока управления (control flow)
Поток управления фьючеров
Фьючеры могут объединяться вместе для создания графов потоков параллельных вычислений. Мы уже видели задачи, которые функционируют как автономные потоки выполнения.
Join
Метод
join_all
ждет, когда все фьючеры будут готовы, и возвращает их результаты. Это похоже наPromise.all
вJavaScript
илиasyncio.gather
вPython
.
use anyhow::Result; use futures::future; use reqwest; use std::collections::HashMap; async fn size_of_page(url: &str) -> Result<usize> { let resp = reqwest::get(url).await?; Ok(resp.text().await?.len()) } #[tokio::main] async fn main() { let urls: [&str; 4] = [ "https://google.com", "https://httpbin.org/ip", "https://play.rust-lang.org/", "BAD_URL", ]; let futures_iter = urls.into_iter().map(size_of_page); let results = future::join_all(futures_iter).await; let page_sizes_dict: HashMap<&str, Result<usize>> = urls.into_iter().zip(results.into_iter()).collect(); println!("{:?}", page_sizes_dict); }
Ремарки:
- для нескольких фьючеров непересекающихся типов можно использовать
std::future::join!
но мы должны знать, сколько фьючеров у нас будет во время компиляции. В настоящее времяjoin_all()
находится в крейтеfutures
, но скоро будет стабилизирован вstd::future
- риск соединения заключается в том, что какой-нибудь фьючер может никогда не разрешиться, что приведет к остановке программы
- мы можем комбинировать
join_all()
сjoin!
, например, чтобы объединить все запросы к службеHTTP
, а также запрос к базе данных. Попробуйте добавитьtokio::time::sleep()
во фьючер, используяfuture::join!
. Это не таймаут (для которого требуетсяselect!
, как описано в следующем разделе), а демонстрация работыjoin!
Select
Операция выбора (select) ждет готовности любого фьючера из набора и реагирует на его результат. В
JavaScript
это похоже наPromise.race
. ВPython
это похоже наasyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED)
.
Подобно оператору
match
, телоselect!
имеет несколько ветвей (arms), каждая из которых имеет формуpattern = future => statement
. Когдаfuture
готов, его возвращаемое значение деструктурируетсяpattern
. Затемstatement
запускается с итоговыми переменными. Результатstatement
становится результатом макросаselect!
.
use tokio::sync::mpsc::{self, Receiver}; use tokio::time::{sleep, Duration}; #[derive(Debug, PartialEq)] enum Animal { Cat { name: String }, Dog { name: String }, } async fn first_animal_to_finish_race( mut cat_rcv: Receiver<String>, mut dog_rcv: Receiver<String>, ) -> Option<Animal> { tokio::select! { cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }), dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? }) } } #[tokio::main] async fn main() { let (cat_sender, cat_receiver) = mpsc::channel(32); let (dog_sender, dog_receiver) = mpsc::channel(32); tokio::spawn(async move { sleep(Duration::from_millis(500)).await; cat_sender.send(String::from("Феликс")).await.expect("ошибка отправки имени кота"); }); tokio::spawn(async move { sleep(Duration::from_millis(50)).await; dog_sender.send(String::from("Рекс")).await.expect("ошибка отправки имени собаки"); }); let winner = first_animal_to_finish_race(cat_receiver, dog_receiver) .await .expect("ошибка получения победителя"); println!("Победителем является {winner:?}"); }
Ремарки:
- в примере у нас имеется гонка между кошкой и собакой.
first_animal_to_finish_race()
"слушает" (listening) оба канала и возвращает первый по времени результат. Поскольку имя собаки прибывает через 50 мс, собака выигрывает у кошки, имя которой прибывает через 500 мс - в примере вместо
channel
можно использоватьoneshot
, поскольку предполагается однократный вызов методаsend
- попробуйте добавить к гонке дедлайн, демонстрируя выбор разных фьючеров
- обратите внимание, что
select!
уничтожает не совпавшие ветви, что отменяет их фьючеры.select!
легче всего использовать, когда каждое выполнение этого макроса создает новые фьючеры
- альтернативой является передача
&mut future
вместо самого фьючера, но это может привести к проблемам, о котором мы поговорим позже
- альтернативой является передача
Ловушки async/await
async/await
предоставляет удобную и эффективную абстракцию для параллельного асинхронного программирования. Однако модельasync/await
вRust
также имеет свои подводные камни и ловушки, о которых мы поговорим в этом разделе.
Блокировка исполнителя
Большинство асинхронных сред выполнения допускают одновременное выполнение только задач ввода-вывода. Это означает, что задачи блокировки ЦП будут блокировать исполнителя (executor) и препятствовать выполнению других задач. Простой обходной путь — использовать эквивалентные асинхронные методы там, где это возможно.
use futures::future::join_all; use std::time::Instant; async fn sleep_ms(start: &Instant, id: u64, duration_ms: u64) { std::thread::sleep(std::time::Duration::from_millis(duration_ms)); println!( "фьючер {id} спал в течение {duration_ms} мс, завершился после {} мс", start.elapsed().as_millis() ); } #[tokio::main(flavor = "current_thread")] async fn main() { let start = Instant::now(); let sleep_futures = (1..=10).map(|t| sleep_ms(&start, t, t * 10)); join_all(sleep_futures).await; }
Ремарки:
- запустите код и убедитесь, что переходы в режим сна происходят последовательно, а не одновременно
-
flavor = "current_thread"
помещает все задачи в один поток. Это делает эффект более очевидным, но рассмотренная ошибка присутствует и в многопоточной версии - замените
std::thread::sleep
наtokio::time::sleep
и дождитесь результата - другим решением может быть
tokio::task::spawn_blocking
, который порождает реальный поток и преобразует его дескриптор вfuture
, не блокируя исполнителя - о задачах не следует думать как о потоках ОС. Они не совпадают 1 к 1, и большинство исполнителей позволяют выполнять множество задач в одном потоке ОС. Это особенно проблематично при взаимодействии с другими библиотеками через
FFI
, где эта библиотека может зависеть от локального хранилища потоков или сопоставляться с конкретными потоками ОС (например,CUDA
). В таких ситуациях отдавайте предпочтениеtokio::task::spawn_blocking
- используйте синхронные мьютексы осторожно. Удержание мьютекса над
.await
может привести к блокировке другой задачи, которая может выполняться в том же потоке
Pin
Асинхронные блоки и функции возвращают типы, реализующие трейт
Future
. Возвращаемый тип является результатом преобразования компилятора, который превращает локальные переменные в данные, хранящиеся внутри фьючера.
Некоторые из этих переменных могут содержать указатели на другие локальные переменные. По этой причине фьючеры никогда не должны перемещаться в другую ячейку памяти, поскольку это сделает такие указатели недействительными.
Чтобы предотвратить перемещение фьючера в памяти, его можно опрашивать только через закрепленный (pinned) указатель.
Pin
— это оболочка ссылки, которая запрещает все операции, которые могли бы переместить экземпляр, на который она указывает, в другую ячейку памяти.
use tokio::sync::{mpsc, oneshot}; use tokio::task::spawn; use tokio::time::{sleep, Duration}; // Рабочая единица. В данном случае она просто спит в течение определенного времени // и отвечает сообщением в канал `respond_on` #[derive(Debug)] struct Work { input: u32, respond_on: oneshot::Sender<u32>, } // Воркер, который ищет работу в очереди (queue) и выполняет ее async fn worker(mut work_queue: mpsc::Receiver<Work>) { let mut iterations = 0; loop { tokio::select! { Some(work) = work_queue.recv() => { sleep(Duration::from_millis(10)).await; // выполняем "работу" work.respond_on .send(work.input * 1000) .expect("провал отправки ответа"); iterations += 1; } // TODO: сообщать о количестве итераций каждый 100 мс } } } // "Запрашиватель" (requester), который запрашивает работу и ждет ее выполнения async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 { let (tx, rx) = oneshot::channel(); work_queue .send(Work { input, respond_on: tx }) .await .expect("провал отправки работы в очередь"); rx.await.expect("провал ожидания ответа") } #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(10); spawn(worker(rx)); for i in 0..100 { let resp = do_work(&tx, i).await; println!("результат работы для итерации {i}: {resp}"); } }
Ремарки:
- в примере вы могли распознать шаблон актора (actor pattern). Акторы, как правило, вызывают
select!
в цикле - это обобщение нескольких предыдущих уроков, так что не торопитесь
- добавьте
_ = sleep(Duration::from_millis(100)) => { println!(..) }
вselect!
. Это никогда не выполнится. Почему? - теперь добавьте
timeout_fut
, содержащий этот фьючер за пределамиloop
:
- добавьте
let mut timeout_fut = sleep(Duration::from_millis(100)); loop { select! { .., _ = timeout_fut => { println!(..); }, } }
- это также не будет работать. Изучите ошибки компилятора, добавьте
&mut
вtimeout_fut
вselect!
для решения проблемы перемещения, затем используйтеBox::pin
:
let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100))); loop { select! { .., _ = &mut timeout_fut => { println!(..); }, } }
- это компилируется, но по истечении тайм-аута на каждой итерации происходит
Poll::Ready
(для решения этой проблемы может помочь объединенный фьючер). Обновите код, чтобы сбрасыватьtimeout_fut
каждый раз, когда он истекает
-
Box
выделяет память в куче. В некоторых случаяхstd::pin::pin!
— это тоже вариант, но его сложно использовать для фьючера, которой переназначается - другая альтернатива — вообще не использовать
pin
, а создать другую задачу, которая будет отправляться в каналoneshot
каждые 100 мс - данные, содержащие указатели на себя, называются самоссылающимися (self-referential). Обычно средство проверки заимствований (borrow checker) в
Rust
предотвращает перемещение таких данных, поскольку ссылки не могут жить дольше данных, на которые они указывают. Однако преобразование кода для асинхронных блоков и функций не проверяется средством проверки заимствований -
Pin
— это обертка над ссылкой. Объект не может перемещаться с помощью закрепленного указателя. Однако он может перемещаться с помощью незакрепленного указателя - метод
poll
трейтаFuture
используетPin<&mut Self>
вместо&mut Self
для ссылки на экземпляр. Вот почему он может вызываться только на закрепленном указателе
-
Асинхронные трейты
Асинхронные методы в трейтах пока не поддерживаются в стабильной версии
Rust
.
Крейт async_trait предоставляет макрос для решения этой задачи:
use async_trait::async_trait; use std::time::Instant; use tokio::time::{sleep, Duration}; #[async_trait] trait Sleeper { async fn sleep(&self); } struct FixedSleeper { sleep_ms: u64, } #[async_trait] impl Sleeper for FixedSleeper { async fn sleep(&self) { sleep(Duration::from_millis(self.sleep_ms)).await; } } async fn run_all_sleepers_multiple_times( sleepers: Vec<Box<dyn Sleeper>>, n_times: usize, ) { for _ in 0..n_times { println!("running all sleepers.."); for sleeper in &sleepers { let start = Instant::now(); sleeper.sleep().await; println!("slept for {}ms", start.elapsed().as_millis()); } } } #[tokio::main] async fn main() { let sleepers: Vec<Box<dyn Sleeper>> = vec![ Box::new(FixedSleeper { sleep_ms: 50 }), Box::new(FixedSleeper { sleep_ms: 100 }), ]; run_all_sleepers_multiple_times(sleepers, 5).await; }
Ремарки:
-
async_trait
прост в использовании, но учтите, что для работы он использует выделение памяти в куче. Это влечет издержки производительности - попробуйте создать новую "спящую" структуру, которая будет спать случайное время, и добавить ее в вектор
Отмена
Удаление фьючера означает, что его больше никогда нельзя будет опросить. Это называется отменой (cancellation) и может произойти в любой момент ожидания. Необходимо позаботиться о том, чтобы система работала правильно даже в случае отмены фьючера. Например, он не должен блокироваться или терять данные.
use std::io::{self, ErrorKind}; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; struct LinesReader { stream: DuplexStream, } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream } } async fn next(&mut self) -> io::Result<Option<String>> { let mut bytes = Vec::new(); let mut buf = [0]; while self.stream.read(&mut buf[..]).await? != 0 { bytes.push(buf[0]); if buf[0] == b'\n' { break; } } if bytes.is_empty() { return Ok(None); } let s = String::from_utf8(bytes) .map_err(|_| io::Error::new(ErrorKind::InvalidData, "не UTF-8"))?; Ok(Some(s)) } } async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> { for b in source.bytes() { dest.write_u8(b).await?; tokio::time::sleep(Duration::from_millis(10)).await } Ok(()) } #[tokio::main] async fn main() -> std::io::Result<()> { let (client, server) = tokio::io::duplex(5); let handle = tokio::spawn(slow_copy("привет\nпривет\n".to_owned(), client)); let mut lines = LinesReader::new(server); let mut interval = tokio::time::interval(Duration::from_millis(60)); loop { tokio::select! { _ = interval.tick() => println!("тик!"), line = lines.next() => if let Some(l) = line? { print!("{}", l) } else { break }, } } handle.await.unwrap()?; Ok(()) }
Ремарки:
- компилятор не помогает с обеспечением безопасности отмены. Необходимо читать документацию API и понимать, каким состоянием владеет ваша
async fn
- в отличие от
panic!
и?
, отмена — это часть нормального управления потоком выполнения (а не обработка ошибок) - в примере теряется часть строки
- если ветвь
tick()
выполняется первой,next()
и егоbuf
уничтожаются -
LinesReader
можно сделать безопасным для отмены путем включенияbuf
в структуру:
- если ветвь
struct LinesReader { stream: DuplexStream, bytes: Vec<u8>, buf: [u8; 1], } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream, bytes: Vec::new(), buf: [0] } } async fn next(&mut self) -> io::Result<Option<String>> { // ... let raw = std::mem::take(&mut self.bytes); let s = String::from_utf8(raw) // ... } }
- Interval::tick безопасен для отмены, поскольку он отслеживает, был ли "доставлен" (delivered) тик
- AsyncReadExt::read безопасен для отмены, поскольку он либо возвращается, либо не читает данные
- AsyncBufReadExt::read_line, как и пример, не является безопасным для отмены. Подробности и альтернативы см. в документации
Упражнения
Для тренировки навыков работы с асинхронным
Rust
, есть еще два упражнения:
- обедающие философы — на этот раз вам нужно решить эту задачу с помощью асинхронного
Rust
- приложение для чата
Обедающие философы
use std::sync::Arc; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::Mutex; use tokio::time; struct Fork; struct Philosopher { name: String, // left_fork: ... // right_fork: ... // thoughts: ... } impl Philosopher { async fn think(&self) { self.thoughts .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name)) .await .unwrap(); } async fn eat(&self) { // Пытаемся до тех пор, пока не получим обе вилки println!("{} ест...", &self.name); time::sleep(time::Duration::from_millis(5)).await; } } static PHILOSOPHERS: &[&str] = &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"]; #[tokio::main] async fn main() { // Создаем вилки // Создаем философов // Каждый философ размышляет и ест 100 раз // Выводим размышления философов }
Для работы с асинхронным
Rust
рекомендуется использоватьtokio
:
[package] name = "dining-philosophers-async" version = "0.1.0" edition = "2021" [dependencies] tokio = { version = "1.26.0", features = ["sync", "time", "macros", "rt-multi-thread"] }
Подсказка: на этот раз вам придется использовать
Mutex
и модульmpsc
изtokio
.
Решениеuse std::sync::Arc; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::Mutex; use tokio::time; struct Fork; struct Philosopher { name: String, left_fork: Arc<Mutex<Fork>>, right_fork: Arc<Mutex<Fork>>, thoughts: Sender<String>, } impl Philosopher { async fn think(&self) { self.thoughts .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name)) .await .unwrap(); } async fn eat(&self) { // Пытаемся до тех пор, пока не получим обе вилки let (_left_fork, _right_fork) = loop { // Берем вилки... let left_fork = self.left_fork.try_lock(); let right_fork = self.right_fork.try_lock(); let Ok(left_fork) = left_fork else { // Если мы не получили левую вилку, удаляем правую вилку, // если она у нас была, позволяя выполняться другим задачам drop(right_fork); time::sleep(time::Duration::from_millis(1)).await; continue; }; let Ok(right_fork) = right_fork else { // Если мы не получили правую вилку, удаляем левую вилку, // если она у нас была, позволяя выполняться другим задачам drop(left_fork); time::sleep(time::Duration::from_millis(1)).await; continue; }; break (left_fork, right_fork); }; println!("{} ест...", &self.name); time::sleep(time::Duration::from_millis(5)).await; // Блокировки уничтожаются здесь } } static PHILOSOPHERS: &[&str] = &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"]; #[tokio::main] async fn main() { // Создаем вилки let mut forks = vec![]; (0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork)))); // Создаем философов let (philosophers, mut rx) = { let mut philosophers = vec![]; let (tx, rx) = mpsc::channel(10); for (i, name) in PHILOSOPHERS.iter().enumerate() { let left_fork = Arc::clone(&forks[i]); let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]); philosophers.push(Philosopher { name: name.to_string(), left_fork, right_fork, thoughts: tx.clone(), }); } (philosophers, rx) // `tx` уничтожается здесь, поэтому нам не нужно явно удалять его позже }; // Каждый философ думает и ест 100 раз for phil in philosophers { tokio::spawn(async move { for _ in 0..100 { phil.think().await; phil.eat().await; } }); } // Выводим размышления философов while let Some(thought) = rx.recv().await { println!("{thought}"); } }
Чат
В этом упражнении мы используем новые знания для разработки приложения чата. У нас есть сервер, к которому подключаются клиенты и в котором они публикуют свои сообщения. Клиент читает пользовательские сообщения через стандартный ввод и отправляет их на сервер. Сервер передает (broadcast) сообщение всем клиентам.
Для реализации этого функционала мы будем использовать широковещательный канал на сервере и tokio_websockets для взаимодействия между клиентом и сервером.
Создайте новый проект и добавьте следующие зависимости в
Cargo.toml
:
[package] name = "chat-async" version = "0.1.0" edition = "2021" [dependencies] futures-util = { version = "0.3.30", features = ["sink"] } http = "1.0.0" tokio = { version = "1.28.1", features = ["full"] } tokio-websockets = { version = "0.5.1", features = ["client", "fastrand", "server", "sha1_smol"] }
Необходимые API
Вам потребуются следующие функции из
tokio
иtokio_websockets
. Потратьте несколько минут для ознакомления со следующими API:
-
StreamExt::next(), реализуемый
WebSocketStream
— для асинхронного чтения сообщений из потока веб-сокетов -
SinkExt::send(), реализуемый
WebSocketStream
— для асинхронной отправки сообщений в поток веб-сокетов - Lines::next_line() — для асинхронного чтения сообщений пользователя через стандартный ввод
- Sender::subscribe() — для подписки на широковещательный канал
Два бинарника
Как правило, в проекте может быть только один исполняемый файл (binary) и один файл
src/main.rs
. Нам требуется два бинарника. Один для клиента и еще один для сервера. Теоретически их можно сделать двумя отдельными проектами, но мы поместим оба бинарника в один проект. Для того, чтобы это работало, клиент и сервер должны находиться в директорииsrc/bin
(см. документацию).
Скопируйте следующий код сервера и клиента в
src/bin/server.rs
иsrc/bin/client.rs
, соответственно.
// src/bin/server.rs use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use std::error::Error; use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::broadcast::{channel, Sender}; use tokio_websockets::{Message, ServerBuilder, WebSocketStream}; async fn handle_connection( addr: SocketAddr, mut ws_stream: WebSocketStream<TcpStream>, bcast_tx: Sender<String>, ) -> Result<(), Box<dyn Error + Send + Sync>> { todo!("реализуй меня") } #[tokio::main] async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let (bcast_tx, _) = channel(16); let listener = TcpListener::bind("127.0.0.1:2000").await?; println!("Запросы принимаются на порту 2000"); loop { let (socket, addr) = listener.accept().await?; println!("Запрос от {addr:?}"); let bcast_tx = bcast_tx.clone(); tokio::spawn(async move { // Оборачиваем сырой поток TCP в веб-сокет let ws_stream = ServerBuilder::new().accept(socket).await?; handle_connection(addr, ws_stream, bcast_tx).await }); } }
// src/bin/client.rs use futures_util::stream::StreamExt; use futures_util::SinkExt; use http::Uri; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_websockets::{ClientBuilder, Message}; #[tokio::main] async fn main() -> Result<(), tokio_websockets::Error> { let (mut ws_stream, _) = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")) .connect() .await?; let stdin = tokio::io::stdin(); let mut stdin = BufReader::new(stdin).lines(); todo!("реализуй меня") }
Запуск бинарников
Команда для запуска сервера:
cargo run --bin server
Команда для запуска клиента:
cargo run --bin client
Задачи
- реализовать функцию
handle_connection
вsrc/bin/server.rs
- подсказка: используйте
tokio::select!
для параллельного выполнения двух задач в бесконечном цикле. Одна задача получает сообщения от клиента и передает их другим клиентам. Другая — отправляет клиенту сообщения, полученные от сервера
- подсказка: используйте
- завершите функцию
main
вsrc/bin/client.rs
- подсказка: также используйте
tokio::select!
в бесконечном цикле для параллельного выполнения двух задач: 1) чтение сообщений пользователя из стандартного ввода и их отправка серверу; 2) получение сообщений от сервера и их отображение
- подсказка: также используйте
- опционально: измените код для передачи сообщений всем клиентам, кроме отправителя
Решение// src/bin/server.rs use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use std::error::Error; use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::broadcast::{channel, Sender}; use tokio_websockets::{Message, ServerBuilder, WebSocketStream}; async fn handle_connection( addr: SocketAddr, mut ws_stream: WebSocketStream<TcpStream>, bcast_tx: Sender<String>, ) -> Result<(), Box<dyn Error + Send + Sync>> { ws_stream .send(Message::text("Добро пожаловать в чат! Отправьте сообщение".to_string())) .await?; let mut bcast_rx = bcast_tx.subscribe(); // Бесконечный цикл для параллельного выполнения двух задач: // 1) получение сообщений из `ws_stream` и их передача клиентам // 2) получение сообщений в `bcast_rx` и их отправка клиенту loop { tokio::select! { incoming = ws_stream.next() => { match incoming { Some(Ok(msg)) => { if let Some(text) = msg.as_text() { println!("{addr:?}: {text:?}"); bcast_tx.send(text.into())?; } } Some(Err(err)) => return Err(err.into()), None => return Ok(()), } } msg = bcast_rx.recv() => { ws_stream.send(Message::text(msg?)).await?; } } } } #[tokio::main] async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let (bcast_tx, _) = channel(16); let listener = TcpListener::bind("127.0.0.1:2000").await?; println!("Запросы принимаются на порту 2000"); loop { let (socket, addr) = listener.accept().await?; println!("Запрос от {addr:?}"); let bcast_tx = bcast_tx.clone(); tokio::spawn(async move { // Оборачиваем сырой поток TCP в веб-сокет let ws_stream = ServerBuilder::new().accept(socket).await?; handle_connection(addr, ws_stream, bcast_tx).await }); } }
// src/bin/client.rs use futures_util::stream::StreamExt; use futures_util::SinkExt; use http::Uri; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_websockets::{ClientBuilder, Message}; #[tokio::main] async fn main() -> Result<(), tokio_websockets::Error> { let (mut ws_stream, _) = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")) .connect() .await?; let stdin = tokio::io::stdin(); let mut stdin = BufReader::new(stdin).lines(); // Бесконечный цикл для параллельной отправки и получения сообщений loop { tokio::select! { incoming = ws_stream.next() => { match incoming { Some(Ok(msg)) => { if let Some(text) = msg.as_text() { println!("От сервера: {}", text); } }, Some(Err(err)) => return Err(err.into()), None => return Ok(()), } } res = stdin.next_line() => { match res { Ok(None) => return Ok(()), Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?, Err(err) => return Err(err.into()), } } } } }
Это конец бонусной части руководства.
Материалы для более глубокого изучения рассмотренных тем:
- Книга/учебник по Rust (на русском языке) — главы 16 и 20
- rustlings — упражнение 20
- Rust на примерах (на русском языке) — примеры 20.1 и 20.2
Happy coding!
-
Комментарии (10)
domix32
28.03.2024 10:41+4фьючерса
Фьючерсы - это термин из финансов. В расте они фьючи или футуры aka Future<T>.
hrls
28.03.2024 10:41+1вызов
send()
блокирует текущий поток до тех пор, пока в канале имеется место для новых сообщенийРазве не наоборот?
Unbounded в случае очередей будет логичнее назвать безразмерной.
Sabirman
Как в Rust-е сделать объектную динамически-подключаемую библиотеку.
На Java, C#, Python и т.п. объекты стандартизированы. Т.е. ты в своем классе реализуешь какой-то интерфейс и подключаешь свой плагин к основной программе.
В Rust-е, я так понимаю, можно реализовывать только Си-шные функции. И вся прелесть раста при этом теряется.
domix32
Если хочется сделать доступ через C ABI, то делаем тип либы dylib, выключаем манглинг функций для нашего интерфейса и приводим свой API к сишным сигнатурам. Собственный отдельный ржавый FFI уже много лет в разработке, но и работы там непочатый край ибо никто не осилил пока оформить какой-то пропозал, удовлетворяющий эту хотелку. В некотором экспериментальном виде для WASM можно использовать interface types, так что в теории если ваш wasm рантайм их поддерживает, то можно делать через него подключаться.
В остальном не очень понятно какие ещё вам варианты нужны? Общение между языками традиционно происходит через C ABI происходит, хоть в python, хоть в go, хоть в java и пока что лучше ничего не придумали. Есть отдельные крейты, позволяющие работать с этими языками на уровне объектов этих языков - например, pyo3 для питона или robusta для JNI.
Sabirman
Попробую по другому объяснить. Есть внешняя, динамически-подключаемая, библиотека, которая тоже написана на Rust. Но, несмотря на то, что она написана на Rust-е, я должен вызывать её методы в Unsafe-блоках. В Java, C#, Python и т.д. в аналогичной ситуации внешний вызов не отличается от вызова внутренней функции и соблюдаются все защиты, заложенные в языке.
А большие приложения никогда не идут одним исполняемым файлом, т.е. Rust не готов к использованию в больших приложениях.
Или я ошибаюсь?
domix32
Судя по докам и обсуждениям динамические либы доступны только через С API, поэтому да, даже если она на Rust, она будет считаться за либу на другом языке и необходимы врапперы поверх unsafe кода. WASM+WASI модули в теории могут позволить избегать этого, но принципиально ситуация не меняется.
Не вижу принципиальной разницы, если б всё писалось на том же C/С++. Оно буквально работает "как у всех". С тем же успехом можно сказать, что и С/С++ не готовы к использованию в больших приложениях.
эти языки по-умолчанию имеют некоторый рантайм с некоторым окружением и пространством имён, в который они могут импортировать внешние модули. Они не имеют гарантий безопасности аналогичных Rust и фактически превращаются в каст void* к сигнатуре, которую вы представили. В Rust вам необходимо представить какие-то гарантии относительно загружаемого модуля компилятору, т.к. нет никакого рантайма, поэтому да - unsafe и врапперы - ваш вариант. Благо их кажется можно генерировать автоматически через макросы.
sdramare
Они не имеют гарантий безопасности аналогичных Rust и фактически превращаются в каст void* к сигнатуре, которую вы представили.
Я думаю человек выше больше подразумевал что конечные бинарники Java/C# содержат в себе метаинформацию(рефлексию) о всех структурах/классах и методах внутри, так что их можно динамически подключать с гарантией что нужный класс реализует заданный интерфейс. В расте же при компиляции никакой метаинформации вроде того какие трейты для какой структуры реализованы не сохраняется, по-этому динамическая загрузка библиотеки всегда будет unsafe.
domix32
не совсем так. оно в том или ином виде будет ссылаться на некоторые виртуальные таблицы, которые можно отрефлексировать при желании, но из-за того язык компилируемый гарантии интерфейса намертво зашиваются в бинарь. У managed языков есть рантайм, который позволяет его расширять и работать с ним как будто рабочее окружение было таким всегда.
redfox0
https://stackoverflow.com/a/75903553
Другое дело, что у раста ABI пока ещё (?) не стабилизировано, так что динамические библиотеки и код придётся собирать одинаковой версией компилятора.
Но проблемы с unsafe-блоками не вижу, если у вас есть исходники библиотеки.
DarkEld3r
abi_stable