
Hello world!
Представляю вашему вниманию бонусную часть практического руководства по Rust.
Другой формат, который может показаться вам более удобным.
Руководство основано на Comprehensive Rust — руководстве по Rust от команды Android в Google и рассчитано на людей, которые уверенно владеют любым современным языком программирования. Еще раз: это руководство не рассчитано на тех, кто только начинает кодить ?
В этой части мы поговорим о параллельном (concurrent) и асинхронном (async) Rust.
Материалы для более глубокого изучения названных тем:
- Книга/учебник по Rust (на русском языке) — главы 16 и 20
- rustlings — упражнение 20
- Rust на примерах (на русском языке) — примеры 20.1 и 20.2
Также см. Большую шпаргалку по Rust.
Параллельный Rust
Rust полностью поддерживает параллелизм (concurrency) с использованием потоков ОС с мьютексами (mutexes) и каналами (channels).
Система типов Rust играет важную роль в том, что многие ошибки параллелизма становятся ошибками времени компиляции. Это часто называют бесстрашным параллелизмом (fearless concurrency), поскольку мы можем положиться на компилятор, который обеспечивает правильную обработку параллелизма во время выполнения.
Потоки
Потоки (threads) Rust работают аналогично потокам в других языках:
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("значение счетчика в выделенном потоке: {i}!");
thread::sleep(Duration::from_millis(5));
}
});
for i in 1..5 {
println!("значение счетчика в основном потоке: {i}");
thread::sleep(Duration::from_millis(5));
}
}
- потоки являются потоками демона (daemon threads), основной поток не ждет их выполнения
- потоки паникуют независимо друг от друга
- паника может содержать полезную нагрузку (payload), которую можно извлечь с помощью
downcast_ref
- паника может содержать полезную нагрузку (payload), которую можно извлечь с помощью
Ремарки:
- обратите внимание, что основной поток не ждет выполнения выделенных (spawned) потоков
- для ожидания выполнения потока следует использовать
let handle = thread::spawn()и затемhandle.join() - вызовите панику в потоке. Обратите внимание, что это не влияет на
main - используйте
Result, возвращаемый изhandle.join(), для доступа к полезной нагрузке паники. В этом может помочь Any
Потоки с ограниченной областью видимости
Обычные потоки не могут заимствовать значения из окружения:
use std::thread;
fn foo() {
let s = String::from("привет");
thread::spawn(|| {
println!("длина: {}", s.len());
});
}
fn main() {
foo();
}
Однако для этого можно использовать scoped threads (потоки с ограниченной областью видимости):
use std::thread;
fn main() {
let s = String::from("привет");
thread::scope(|scope| {
scope.spawn(|| {
println!("длина: {}", s.len());
});
});
}
Ремарки:
- когда функция
thread::scopeзавершается, все потоки гарантированно объединяются, поэтому они могут вернуть заимствованные данные - применяются обычные правила заимствования
Rust: мы можем заимствовать значение мутабельно в одном потоке, или иммутабельно в любом количестве потоков
Каналы
Каналы (channels) Rust состоят из двух частей: Sender<T> (отправитель/передатчик) и Receiver<T> (получатель/приемник). Они соединяются с помощью канала, но мы видим только конечные точки:
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
tx.send(10).unwrap();
tx.send(20).unwrap();
println!("Получено: {:?}", rx.recv());
println!("Получено: {:?}", rx.recv());
let tx2 = tx.clone();
tx2.send(30).unwrap();
println!("Получено: {:?}", rx.recv());
}
-
mpscозначает Multi-Producer, Single-Consumer (несколько производителей, один потребитель).SenderиSyncSenderреализуютClone(поэтому мы можем создать несколько производителей), аReceiverне реализует (поэтому у нас может быть только один потребитель) -
send()иrecv()возвращаютResult. Если они возвращаютErr, значит соответствующийSenderилиReceiverуничтожен (dropped) и канал закрыт
Несвязанные каналы
mpsc::channel() возвращает несвязанный (unbounded) и асинхронный канал:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 1..10 {
tx.send(format!("Сообщение {i}")).unwrap();
println!("{thread_id:?}: отправил сообщение {i}");
}
println!("{thread_id:?}: готово");
});
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Основной поток: получено {msg}");
}
}
Связанные каналы
send() связанного (bounded) синхронного канала блокирует текущий поток:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::sync_channel(3);
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 1..10 {
tx.send(format!("Сообщение {i}")).unwrap();
println!("{thread_id:?}: отправил сообщение {i}");
}
println!("{thread_id:?}: готово");
});
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Основной поток: получено {msg}");
}
}
Ремарки:
- вызов
send()блокирует текущий поток до тех пор, пока в канале имеется место для новых сообщений. Поток может блокироваться бесконечно, если отсутствует получатель - вызов
send()заканчивается ошибкой (поэтому возвращаетсяResult), если канал закрыт. Канал закрывается после уничтожения получателя - связанный канал с нулевым размером называется "rendezvous channel". Каждый вызов
send()блокирует текущий поток, пока другой поток не вызоветread()
Send и Sync
Откуда Rust знает о необходимости запрета доступа к общему (shared) состоянию из нескольких потоков? Ответ кроется в двух трейтах:
-
Send: тип
TявляетсяSend, если передачаTв другой поток является безопасной -
Sync: тип
TявляетсяSync, если передача&Tв другой поток является безопасной
Send и Sync являются небезопасными трейтами. Компилятор автоматически реализует их для наших типов при условии, что они содержат только типы Send и Sync. Эти типы можно реализовать самостоятельно, если мы уверены в валидности наших типов.
Ремарки:
- о типах
SendиSyncможно думать как о маркерах того, что тип содержит несколько безопасных с точки зрения потоков (thread-safety) свойств - эти типы могут использоваться в качестве общих ограничений по аналогии с обычными трейтами
Send
Тип T является Send, если передача значения T в другой поток является безопасной.
Эффект перемещения владения в другой поток заключается в том, что деструкторы T запускаются в этом новом потоке. Поэтому вопрос состоит в том, когда мы можем выделить значение в одном потоке и освободить его в другом потоке.
Например, подключение к SQLite доступно только в одном потоке.
Sync
Тип T является Sync, если одновременный доступ к значению T из нескольких потоков является безопасным.
Если быть более точным, определение гласит следующее: T является Sync, если и только если &T является Send.
Это означает, что если тип является "потокобезопасным" (thread-safe) для совместного использования, он также потокобезопасен для передачи ссылок между потоками.
Если тип является Sync, его можно использовать в нескольких потоках без риска возникновения гонок за данными (data race) и других проблем с синхронизацией, поэтому его можно безопасно перемещать в другой поток. Ссылка на тип также может безопасно перемещаться в другой поток, поскольку доступ к данным, на которые она ссылается, из любого потока является безопасным.
Примеры
Send + Sync
Большинство типов является Send + Sync:
-
i8,f32,bool,char,&stretc. -
(T1, T2),[T; N],&[T],struct { x: T }etc. -
String,Option<T>,Vec<T>,Box<T>etc. -
Arc<T>— явно потокобезопасный благодаря атомарному подсчету ссылок -
Mutex<T>— явно потокобезопасный благодаря внутренней блокировке -
AtomicBool,AtomicU8etc., где используются специальные атомарные инструкции
Дженерики обычно являются Send + Sync, когда таковыми являются параметры типов.
Send + !Sync
Эти типы могут перемещаться в другие потоки, но не являются потокобезопасными. Обычно это связано с их внутренней изменчивостью:
mpsc::Sender<T>mpsc::Receiver<T>Cell<T>RefCell<T>
!Send + Sync
Эти типы являются потокобезопасными, но не могут перемещаться в другие потоки:
-
MutexGuard<T: Sync>— использует примитивы уровня ОС, которые должны быть освобождены в создавшем их потоке
!Send + !Sync
Эти типы не являются потокобезопасными и не могут перемещаться в другой поток:
-
Rc<T>— каждыйRc<T>содержит ссылку наRcBox<T>, который содержит неатомарный счетчик ссылок -
*const T,*mut T—Rustпредполагает, что сырые указатели могут иметь нюансы, связанные с их параллельным выполнением
Общее состояние
Rust использует систему типов для обеспечения синхронизации общих (shared) данных. Это делается в основном с помощью двух типов:
-
Arc — атомарный счетчик ссылок на T: обрабатывает передачу между потоками и освобождаетTпри уничтожении последней ссылки на нее
Mutex — обеспечивает взаимоисключающий доступ к значению T
Arc
Arc<T>предоставляет общий доступ только для чтения кTчерезArc::clone():
use std::sync::Arc; use std::thread; fn main() { let v = Arc::new(vec![10, 20, 30]); let mut handles = Vec::new(); for _ in 1..5 { let v = Arc::clone(&v); handles.push(thread::spawn(move || { let thread_id = thread::current().id(); println!("{thread_id:?}: {v:?}"); })); } handles.into_iter().for_each(|h| h.join().unwrap()); println!("v: {v:?}"); }
Ремарки:
-
Arcозначает Atomic Reference Counter (атомарный счетчик ссылок) и является потокобезопасной версиейRc, в которой используются атомарные операции -
Arc<T>реализуетCloneнезависимо от того, делает ли этоT. Он реализуетSendиSync, только еслиTих реализует -
Arc::clone()имеет некоторую цену за счет выполнения атомарных операций, но после этого использованиеTявляется бесплатным - остерегайтесь ссылочных циклов,
Arcне использует сборщик мусора для их обнаружения
- в этом может помочь
std::sync::Weak
- в этом может помочь
Mutex
Mutex<T>обеспечивает взаимное исключение и предоставляет мутабельный доступ кTчерез доступный только для чтения интерфейс (форма внутренней изменчивости):
use std::sync::Mutex; fn main() { let v = Mutex::new(vec![10, 20, 30]); println!("v: {:?}", v.lock().unwrap()); { let mut guard = v.lock().unwrap(); guard.push(40); } println!("v: {:?}", v.lock().unwrap()); }
Обратите внимание на неявную реализацию (
impl<T: Send> Sync for Mutex<T>)https://doc.rust-lang.org/std/sync/struct.Mutex.html#impl-Sync-for-Mutex%3CT%3E.
Ремарки:
-
MutexвRustпохож на коллекцию, состоящую из одного элемента — защищенных данных
- невозможно забыть получить (acquire) мьютекс перед доступом к защищенным данным
- из
&Mutex<T>через блокировку (lock) можно получить&mut T.MutexGuardгарантирует, что&mut Tне живет дольше удерживаемой (held) блокировки -
Mutex<T>реализуетSendиSync, только еслиTреализуетSend -
RwLockявляется блокировкой, доступной как для чтения, так и для записи - почему
lock()возвращаетResult?
- Если поток, в котором находится мьютекс, паникует, мьютекс становится "отравленным" (poisoned), сигнализируя о том, что защищенные данные могут находиться в несогласованном состоянии. Вызов
lock()на отравленном мьютексе проваливается с PoisonError. Для восстановления данных можно вызватьinto_iter()на ошибке
- Если поток, в котором находится мьютекс, паникует, мьютекс становится "отравленным" (poisoned), сигнализируя о том, что защищенные данные могут находиться в несогласованном состоянии. Вызов
Пример
Рассмотрим
ArcиMutexв действии:
use std::thread; // use std::sync::{Arc, Mutex}; fn main() { let v = vec![10, 20, 30]; let handle = thread::spawn(|| { v.push(10); }); v.push(1000); handle.join().unwrap(); println!("v: {v:?}"); }
Возможное решение:
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let v = Arc::new(Mutex::new(vec![10, 20, 30])); let v2 = Arc::clone(&v); let handle = thread::spawn(move || { let mut v2 = v2.lock().unwrap(); v2.push(10); }); { let mut v = v.lock().unwrap(); v.push(1000); } handle.join().unwrap(); println!("v: {v:?}"); }
Ремарки:
-
vобернут вArcиMutex, поскольку их зоны ответственности ортогональны
- оборачивание
MutexвArcявляется распространенным паттерном для передачи мутабельного состояния между потоками
- оборачивание
-
v: Arc<_>должен быть клонирован какv2для передачи в другой поток. Обратите внимание наmoveв сигнатуре лямбды - блоки предназначены для максимального сужения области видимости
LockGuard
Упражнения
Попрактикуемся применять новые знания на двух упражнениях:
- обедающие философы — классическая задача параллелизма
- многопоточная проверка ссылок
Обедающие философы
Условия задачи:
Пять безмолвных философов сидят вокруг круглого стола, перед каждым философом стоит тарелка спагетти. На столе между каждой парой ближайших философов лежит по одной вилке.
Каждый философ может либо есть, либо размышлять. Прием пищи не ограничен количеством оставшихся спагетти — подразумевается бесконечный запас. Тем не менее, философ может есть только тогда, когда держит две вилки — взятую справа и слева.
Каждый философ может взять ближайшую вилку (если она доступна) или положить — если он уже держит ее. Взятие каждой вилки и возвращение ее на стол являются раздельными действиями, которые должны выполняться одно за другим.
Задача заключается в том, чтобы разработать модель (параллельный алгоритм), при которой ни один из философов не будет голодать, то есть будет чередовать прием пищи и размышления 100 раз.
use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; struct Fork; struct Philosopher { name: String, // left_fork: ... // right_fork: ... // thoughts: ... } impl Philosopher { fn think(&self) { self.thoughts .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name)) .unwrap(); } fn eat(&self) { // Берем вилки println!("{} ест...", &self.name); thread::sleep(Duration::from_millis(10)); } } static PHILOSOPHERS: &[&str] = &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"]; fn main() { // Создаем вилки // Создаем философов // Каждый философ размышляет и ест 100 раз // Выводим размышления философов }
Подсказка: рассмотрите возможность использования std::mem::swap для решения проблемы взаимной блокировки (deadlock).
Решениеuse std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; struct Fork; struct Philosopher { name: String, left_fork: Arc<Mutex<Fork>>, right_fork: Arc<Mutex<Fork>>, thoughts: mpsc::SyncSender<String>, } impl Philosopher { fn think(&self) { self.thoughts .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name)) .unwrap(); } fn eat(&self) { println!("{} пытается есть", &self.name); let _left = self.left_fork.lock().unwrap(); let _right = self.right_fork.lock().unwrap(); println!("{} ест...", &self.name); thread::sleep(Duration::from_millis(10)); } } static PHILOSOPHERS: &[&str] = &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"]; fn main() { let (tx, rx) = mpsc::sync_channel(10); let forks = (0..PHILOSOPHERS.len()) .map(|_| Arc::new(Mutex::new(Fork))) .collect::<Vec<_>>(); for i in 0..forks.len() { let tx = tx.clone(); let mut left_fork = Arc::clone(&forks[i]); let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]); // Во избежание взаимной блокировки нам необходимо где-то нарушить симметрию. // Меняем вилки местами без их повторной инициализации if i == forks.len() - 1 { std::mem::swap(&mut left_fork, &mut right_fork); } let philosopher = Philosopher { name: PHILOSOPHERS[i].to_string(), thoughts: tx, left_fork, right_fork, }; thread::spawn(move || { for _ in 0..100 { philosopher.eat(); philosopher.think(); } }); } drop(tx); for thought in rx { println!("{thought}"); } }
Многопоточная проверка ссылок
Создадим инструмент для многопоточной проверки ссылок. Он должен начинать с основной веб-страницы и проверять корректность ссылок на ней. Затем он должен рекурсивно проверять другие страницы в том же домене и продолжать делать это до тех пор, пока все страницы не будут проверены.
Для создания такого инструмента вам потребуется какой-нибудь клиент
HTTP, например, reqwest:
cargo add reqwest --features blocking,rustls-tls
Для обнаружения ссылок можно воспользоваться scraper:
cargo add scraper
Наконец, для обработки ошибок пригодится thiserror:
cargo add thiserror
Cargo.toml:
[package] name = "link-checker" version = "0.1.0" edition = "2021" publish = false [dependencies] reqwest = { version = "0.11.12", features = ["blocking", "rustls-tls"] } scraper = "0.13.0" thiserror = "1.0.37"
Начните с небольшого сайта, такого как
https://www.google.org.
src/main.rs:
use reqwest::blocking::Client; use reqwest::Url; use scraper::{Html, Selector}; use thiserror::Error; #[derive(Error, Debug)] enum Error { #[error("ошибка запроса: {0}")] ReqwestError(#[from] reqwest::Error), #[error("плохой ответ HTTP: {0}")] BadResponse(String), } #[derive(Debug)] struct CrawlCommand { url: Url, extract_links: bool, } fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> { println!("проверка {:#}", command.url); let response = client.get(command.url.clone()).send()?; if !response.status().is_success() { return Err(Error::BadResponse(response.status().to_string())); } let mut link_urls = Vec::new(); if !command.extract_links { return Ok(link_urls); } let base_url = response.url().to_owned(); let body_text = response.text()?; let document = Html::parse_document(&body_text); let selector = Selector::parse("a").unwrap(); let href_values = document .select(&selector) .filter_map(|element| element.value().attr("href")); for href in href_values { match base_url.join(href) { Ok(link_url) => { link_urls.push(link_url); } Err(err) => { println!("в {base_url:#} не поддается разбору {href:?}: {err}"); } } } Ok(link_urls) } fn main() { let client = Client::new(); let start_url = Url::parse("https://www.google.org").unwrap(); let crawl_command = CrawlCommand{ url: start_url, extract_links: true }; match visit_page(&client, &crawl_command) { Ok(links) => println!("ссылки: {links:#?}"), Err(err) => println!("невозможно извлечь ссылки: {err:#}"), } }
Задачи:
- используйте потоки для параллельной проверки ссылок: отправьте проверяемые URL-адреса в канал и позвольте нескольким потокам проверять URL-адреса параллельно
- реализуйте рекурсивное извлечение ссылок со всех страниц домена
www.google.org. Установите верхний предел в 100 страниц или около того, чтобы сайт вас не заблокировал
Решениеuse std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::collections::HashSet; use reqwest::blocking::Client; use reqwest::Url; use scraper::{Html, Selector}; use thiserror::Error; #[derive(Error, Debug)] enum Error { #[error("ошибка запроса: {0}")] ReqwestError(#[from] reqwest::Error), #[error("плохой ответ HTTP: {0}")] BadResponse(String), } #[derive(Debug)] struct CrawlCommand { url: Url, extract_links: bool, } fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> { println!("проверка {:#}", command.url); let response = client.get(command.url.clone()).send()?; if !response.status().is_success() { return Err(Error::BadResponse(response.status().to_string())); } let mut link_urls = Vec::new(); if !command.extract_links { return Ok(link_urls); } let base_url = response.url().to_owned(); let body_text = response.text()?; let document = Html::parse_document(&body_text); let selector = Selector::parse("a").unwrap(); let href_values = document .select(&selector) .filter_map(|element| element.value().attr("href")); for href in href_values { match base_url.join(href) { Ok(link_url) => { link_urls.push(link_url); } Err(err) => { println!("в {base_url:#} не поддается разбору {href:?}: {err}"); } } } Ok(link_urls) } struct CrawlState { domain: String, visited_pages: HashSet<String>, } impl CrawlState { fn new(start_url: &Url) -> CrawlState { let mut visited_pages = HashSet::new(); visited_pages.insert(start_url.as_str().to_string()); CrawlState { domain: start_url.domain().unwrap().to_string(), visited_pages } } /// Определяет, должны ли извлекаться ссылки на указанной странице fn should_extract_links(&self, url: &Url) -> bool { let Some(url_domain) = url.domain() else { return false; }; url_domain == self.domain } /// Помечает указанную страницу как посещенную, /// возвращает `false`, если страница уже посещалась fn mark_visited(&mut self, url: &Url) -> bool { self.visited_pages.insert(url.as_str().to_string()) } } type CrawlResult = Result<Vec<Url>, (Url, Error)>; fn spawn_crawler_threads( command_receiver: mpsc::Receiver<CrawlCommand>, result_sender: mpsc::Sender<CrawlResult>, thread_count: u32, ) { let command_receiver = Arc::new(Mutex::new(command_receiver)); for _ in 0..thread_count { let result_sender = result_sender.clone(); let command_receiver = command_receiver.clone(); thread::spawn(move || { let client = Client::new(); loop { let command_result = { let receiver_guard = command_receiver.lock().unwrap(); receiver_guard.recv() }; let Ok(crawl_command) = command_result else { // Отправитель уничтожен, команд больше не будет break; }; let crawl_result = match visit_page(&client, &crawl_command) { Ok(link_urls) => Ok(link_urls), Err(error) => Err((crawl_command.url, error)), }; result_sender.send(crawl_result).unwrap(); } }); } } fn control_crawl( start_url: Url, command_sender: mpsc::Sender<CrawlCommand>, result_receiver: mpsc::Receiver<CrawlResult>, ) -> Vec<Url> { let mut crawl_state = CrawlState::new(&start_url); let start_command = CrawlCommand { url: start_url, extract_links: true }; command_sender.send(start_command).unwrap(); let mut pending_urls = 1; let mut bad_urls = Vec::new(); while pending_urls > 0 { let crawl_result = result_receiver.recv().unwrap(); pending_urls -= 1; match crawl_result { Ok(link_urls) => { for url in link_urls { if crawl_state.mark_visited(&url) { let extract_links = crawl_state.should_extract_links(&url); let crawl_command = CrawlCommand { url, extract_links }; command_sender.send(crawl_command).unwrap(); pending_urls += 1; } } } Err((url, error)) => { bad_urls.push(url); println!("при извлечении ссылок возникла ошибка: {:#}", error); continue; } } } bad_urls } fn check_links(start_url: Url) -> Vec<Url> { let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>(); let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>(); spawn_crawler_threads(command_receiver, result_sender, 16); control_crawl(start_url, command_sender, result_receiver) } fn main() { let start_url = reqwest::Url::parse("https://www.google.org").unwrap(); let bad_urls = check_links(start_url); println!("плохие URL: {:#?}", bad_urls); }
Асинхронный Rust
"Асинхронность" (async) — это модель параллелизма, в которой несколько задач выполняются одновременно. Каждая задача выполняется до тех пор, пока не завершится или не заблокируется, затем выполняется следующая (готовая к выполнению) задача и т.д. Такая модель позволяет выполнять большое количество задач с помощью небольшого числа потоков. Это связано с тем, что накладные расходы на выполнение каждой задачи обычно очень низкие, а операционные системы предоставляют примитивы для эффективного переключения между задачами.
Асинхронные операции
Rustоснованы на фьючерах (futures, от "future" — будущее), представляющих собой работу, которая может быть завершена в будущем. Фьючеры "опрашиваются" (polled) до тех пор, пока не сообщат о завершении.
Фьючеры опрашиваются асинхронной средой выполнения (async runtime). Доступно несколько таких сред. Одной из самых популярных является Tokio.
Сравнения:
- в
Pythonиспользуется похожая модель вasyncio. Однако, его типFutureоснован на функциях обратного вызова (callbacks), а не на опросах. Асинхронные программыPythonдолжны выполняться в цикле, как и асинхронные программыRust -
PromiseвJavaScriptпохож на фьючер, но также основан на колбэках. Среды выполнения реализует цикл событий (event loop), поэтому многие детали разрешения промиса являются скрытыми
Основы асинхронности
async/await
На высоком уровне асинхронный код
Rustвыглядит очень похоже на обычный синхронный код:
use futures::executor::block_on; async fn count_to(count: i32) { for i in 1..=count { println!("Значение счетчика: {i}!"); } } async fn async_main(count: i32) { count_to(count).await; } fn main() { block_on(async_main(10)); }
Ремарки:
- это упрощенный пример для демонстрации синтаксиса. В нем отсутствуют долгие операции или параллелизм
- какой тип возвращает асинхронная функция?
- используйте
let future: () = async_main(10);вmain()и посмотрите
- используйте
- ключевое слово
async— это синтаксический сахар. Компилятор заменяет возвращаемый тип фьючером - мы не можем сделать функцию
mainасинхронной без предоставления компилятору дополнительных инструкций о том, как использовать возвращаемый фьючер - для запуска асинхронного кода требуется исполнитель (executor).
block_on()блокирует текущий поток до завершения фьючера -
.awaitасинхронно ждет завершения другой операции. В отличие отblock_on(),awaitне блокирует текущий поток -
awaitможет использоваться только внутри асинхронной функции (или блока, о чем мы поговорим позже)
Фьючеры
Future — это трейт, реализуемый объектами, представляющими операцию, которая пока не может быть завершена. Фьючеры могут опрашиваться,
poll()возвращает Poll.
use std::pin::Pin; use std::task::Context; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } pub enum Poll<T> { Ready(T), Pending, }
Асинхронная функция возвращает
impl Future. Также возможно (но редко применяется) реализоватьFutureдля собственных типов. Например,JoinHandle, возвращаемыйtokio::spawnреализуетFuture, что позволяет присоединяться (join) к нему.
Ключевое слово
awaitприостанавливает выполнение асинхронной функции, пока фьючер не будет готов.
Ремарки:
- типы
FutureиPollреализованы в точности, как показано - мы не будем рассматривать
PinиContext, поскольку не будет создавать новые асинхронные примитивы. Коротко:
-
Contextпозволяет фьючеру планировать повторный опрос при возникновении события -
Pinгарантирует, что фьючер не перемещается в памяти, поэтому ссылки в этом фьючере остаются валидными. Это необходимо, чтобы ссылки оставались валидными послеawait
-
Среда выполнения
Среда выполнения (runtime) предоставляет поддержку для асинхронного выполнения операций (reactor) и отвечает за выполнение фьючеров (executor).
Rustне имеет "встроенной" среды выполнения, но доступно несколько вариантов:
-
Tokio — производительный, с хорошим набором инструментов, таких как Hyper для
HTTPили Tonic дляgRPC -
async-std — стремится быть "стандартной библиотекой для асинхронного кода" и включает стандартную среду выполнения в
async::task - smol — простой и легковесный
Несколько больших приложений имеют собственные среды выполнения. Одним из таких приложений является Fuchsia.
Фьючеры являются "инертными" в том смысле, что они ничего не делают, пока не будут опрошены исполнителем. Это отличается от промисов
JS, например, которые запускаются, даже если никогда не используются.
Tokio
Tokioпредоставляет:
- многопоточную среду выполнения для выполнения асинхронного кода
- асинхронную версию стандартной библиотеки
- большую экосистему библиотек
use tokio::time; async fn count_to(count: i32) { for i in 1..=count { println!("Значение счетчика в задаче: {i}!"); time::sleep(time::Duration::from_millis(5)).await; } } #[tokio::main] async fn main() { tokio::spawn(count_to(10)); for i in 1..5 { println!("Значение счетчика в основной задаче: {i}"); time::sleep(time::Duration::from_millis(5)).await; } }
Ремарки:
- макрос
tokio::mainпозволяет делать функциюmainасинхронной - функция
spawnсоздает новую параллельную "задачу" - обратите внимание:
spawnпринимаетFuture, мы не вызывает.awaitнаcount_to() - почему
count_to()обычно не доходит до 10? Это пример отмены асинхронной операции.tokio::spawn()возвращает обработчик (handle), который может заставить поток ждать его завершения - попробуйте заменить
tokio::spawnнаcount_to(10).await - попробуйте добавить ожидание завершения задачи, возвращаемой
tokio::spawn()
Задачи
Rustимеет систему задач (task system), которая является формой легковесного трейдинга (threading).
Задача имеет один верхнеуровневый фьючер, который опрашивается исполнителем. Этот фьючер может иметь несколько вложенных фьючеров, которые он опрашивает методом
poll, что приблизительно соответствует стеку вызовов (call stack). Параллелизм внутри задачи возможен путем опроса нескольких дочерних фьючеров, например, запуск таймера и выполнение операции ввода-вывода.
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:0").await?; println!("запросы принимаются на порту {}", listener.local_addr()?.port()); loop { let (mut socket, addr) = listener.accept().await?; println!("запрос из {addr:?}"); tokio::spawn(async move { socket.write_all(b"Кто ты?\n").await.expect("ошибка сокета"); let mut buf = vec![0; 1024]; let name_size = socket.read(&mut buf).await.expect("ошибка сокета"); let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim(); let reply = format!("Привет, {name}!\n"); socket.write_all(reply.as_bytes()).await.expect("ошибка сокета"); }); } }
- Перед нами
asyncблок. Такие блоки похожи на замыкания, но не принимают параметров. Их возвращаемым значением является фьючер, как уasync fn - переделайте асинхронный блок в функцию и улучшите обработку ошибок с помощью оператора
?
Асинхронные каналы
Некоторые крейты поддерживают асинхронные каналы, например,
tokio:
use tokio::sync::mpsc::{self, Receiver}; async fn ping_handler(mut input: Receiver<()>) { let mut count: usize = 0; while let Some(_) = input.recv().await { count += 1; println!("получено {count} пингов"); } println!("ping_handler завершен"); } #[tokio::main] async fn main() { let (sender, receiver) = mpsc::channel(32); let ping_handler_task = tokio::spawn(ping_handler(receiver)); for i in 0..10 { sender.send(()).await.expect("провал отправки пинга"); println!("отправлено {} пингов", i + 1); } drop(sender); ping_handler_task.await.expect("что-то пошло не так"); }
- измените размер канала на
3и посмотрите, как это повлияет на выполнение - интерфейс асинхронных каналов аналогичен интерфейсу
syncканалов, о которых мы говорили ранее - попробуйте удалить
std::mem::drop. Что произойдет? Почему? - крейт Flume предоставляет каналы, которые реализуют как
sync, так иasync send, иrecv. Это может быть полезным для сложных приложений с задачами обработки ввода-вывода и тяжелыми для ЦП задачами -
asyncканалы могут быть использованы вместе с другимиfutureдля создания сложного потока управления (control flow)
Поток управления фьючеров
Фьючеры могут объединяться вместе для создания графов потоков параллельных вычислений. Мы уже видели задачи, которые функционируют как автономные потоки выполнения.
Join
Метод
join_allждет, когда все фьючеры будут готовы, и возвращает их результаты. Это похоже наPromise.allвJavaScriptилиasyncio.gatherвPython.
use anyhow::Result; use futures::future; use reqwest; use std::collections::HashMap; async fn size_of_page(url: &str) -> Result<usize> { let resp = reqwest::get(url).await?; Ok(resp.text().await?.len()) } #[tokio::main] async fn main() { let urls: [&str; 4] = [ "https://google.com", "https://httpbin.org/ip", "https://play.rust-lang.org/", "BAD_URL", ]; let futures_iter = urls.into_iter().map(size_of_page); let results = future::join_all(futures_iter).await; let page_sizes_dict: HashMap<&str, Result<usize>> = urls.into_iter().zip(results.into_iter()).collect(); println!("{:?}", page_sizes_dict); }
Ремарки:
- для нескольких фьючеров непересекающихся типов можно использовать
std::future::join!но мы должны знать, сколько фьючеров у нас будет во время компиляции. В настоящее времяjoin_all()находится в крейтеfutures, но скоро будет стабилизирован вstd::future - риск соединения заключается в том, что какой-нибудь фьючер может никогда не разрешиться, что приведет к остановке программы
- мы можем комбинировать
join_all()сjoin!, например, чтобы объединить все запросы к службеHTTP, а также запрос к базе данных. Попробуйте добавитьtokio::time::sleep()во фьючер, используяfuture::join!. Это не таймаут (для которого требуетсяselect!, как описано в следующем разделе), а демонстрация работыjoin!
Select
Операция выбора (select) ждет готовности любого фьючера из набора и реагирует на его результат. В
JavaScriptэто похоже наPromise.race. ВPythonэто похоже наasyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED).
Подобно оператору
match, телоselect!имеет несколько ветвей (arms), каждая из которых имеет формуpattern = future => statement. Когдаfutureготов, его возвращаемое значение деструктурируетсяpattern. Затемstatementзапускается с итоговыми переменными. Результатstatementстановится результатом макросаselect!.
use tokio::sync::mpsc::{self, Receiver}; use tokio::time::{sleep, Duration}; #[derive(Debug, PartialEq)] enum Animal { Cat { name: String }, Dog { name: String }, } async fn first_animal_to_finish_race( mut cat_rcv: Receiver<String>, mut dog_rcv: Receiver<String>, ) -> Option<Animal> { tokio::select! { cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }), dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? }) } } #[tokio::main] async fn main() { let (cat_sender, cat_receiver) = mpsc::channel(32); let (dog_sender, dog_receiver) = mpsc::channel(32); tokio::spawn(async move { sleep(Duration::from_millis(500)).await; cat_sender.send(String::from("Феликс")).await.expect("ошибка отправки имени кота"); }); tokio::spawn(async move { sleep(Duration::from_millis(50)).await; dog_sender.send(String::from("Рекс")).await.expect("ошибка отправки имени собаки"); }); let winner = first_animal_to_finish_race(cat_receiver, dog_receiver) .await .expect("ошибка получения победителя"); println!("Победителем является {winner:?}"); }
Ремарки:
- в примере у нас имеется гонка между кошкой и собакой.
first_animal_to_finish_race()"слушает" (listening) оба канала и возвращает первый по времени результат. Поскольку имя собаки прибывает через 50 мс, собака выигрывает у кошки, имя которой прибывает через 500 мс - в примере вместо
channelможно использоватьoneshot, поскольку предполагается однократный вызов методаsend - попробуйте добавить к гонке дедлайн, демонстрируя выбор разных фьючеров
- обратите внимание, что
select!уничтожает не совпавшие ветви, что отменяет их фьючеры.select!легче всего использовать, когда каждое выполнение этого макроса создает новые фьючеры
- альтернативой является передача
&mut futureвместо самого фьючера, но это может привести к проблемам, о котором мы поговорим позже
- альтернативой является передача
Ловушки async/await
async/awaitпредоставляет удобную и эффективную абстракцию для параллельного асинхронного программирования. Однако модельasync/awaitвRustтакже имеет свои подводные камни и ловушки, о которых мы поговорим в этом разделе.
Блокировка исполнителя
Большинство асинхронных сред выполнения допускают одновременное выполнение только задач ввода-вывода. Это означает, что задачи блокировки ЦП будут блокировать исполнителя (executor) и препятствовать выполнению других задач. Простой обходной путь — использовать эквивалентные асинхронные методы там, где это возможно.
use futures::future::join_all; use std::time::Instant; async fn sleep_ms(start: &Instant, id: u64, duration_ms: u64) { std::thread::sleep(std::time::Duration::from_millis(duration_ms)); println!( "фьючер {id} спал в течение {duration_ms} мс, завершился после {} мс", start.elapsed().as_millis() ); } #[tokio::main(flavor = "current_thread")] async fn main() { let start = Instant::now(); let sleep_futures = (1..=10).map(|t| sleep_ms(&start, t, t * 10)); join_all(sleep_futures).await; }
Ремарки:
- запустите код и убедитесь, что переходы в режим сна происходят последовательно, а не одновременно
-
flavor = "current_thread"помещает все задачи в один поток. Это делает эффект более очевидным, но рассмотренная ошибка присутствует и в многопоточной версии - замените
std::thread::sleepнаtokio::time::sleepи дождитесь результата - другим решением может быть
tokio::task::spawn_blocking, который порождает реальный поток и преобразует его дескриптор вfuture, не блокируя исполнителя - о задачах не следует думать как о потоках ОС. Они не совпадают 1 к 1, и большинство исполнителей позволяют выполнять множество задач в одном потоке ОС. Это особенно проблематично при взаимодействии с другими библиотеками через
FFI, где эта библиотека может зависеть от локального хранилища потоков или сопоставляться с конкретными потоками ОС (например,CUDA). В таких ситуациях отдавайте предпочтениеtokio::task::spawn_blocking - используйте синхронные мьютексы осторожно. Удержание мьютекса над
.awaitможет привести к блокировке другой задачи, которая может выполняться в том же потоке
Pin
Асинхронные блоки и функции возвращают типы, реализующие трейт
Future. Возвращаемый тип является результатом преобразования компилятора, который превращает локальные переменные в данные, хранящиеся внутри фьючера.
Некоторые из этих переменных могут содержать указатели на другие локальные переменные. По этой причине фьючеры никогда не должны перемещаться в другую ячейку памяти, поскольку это сделает такие указатели недействительными.
Чтобы предотвратить перемещение фьючера в памяти, его можно опрашивать только через закрепленный (pinned) указатель.
Pin— это оболочка ссылки, которая запрещает все операции, которые могли бы переместить экземпляр, на который она указывает, в другую ячейку памяти.
use tokio::sync::{mpsc, oneshot}; use tokio::task::spawn; use tokio::time::{sleep, Duration}; // Рабочая единица. В данном случае она просто спит в течение определенного времени // и отвечает сообщением в канал `respond_on` #[derive(Debug)] struct Work { input: u32, respond_on: oneshot::Sender<u32>, } // Воркер, который ищет работу в очереди (queue) и выполняет ее async fn worker(mut work_queue: mpsc::Receiver<Work>) { let mut iterations = 0; loop { tokio::select! { Some(work) = work_queue.recv() => { sleep(Duration::from_millis(10)).await; // выполняем "работу" work.respond_on .send(work.input * 1000) .expect("провал отправки ответа"); iterations += 1; } // TODO: сообщать о количестве итераций каждый 100 мс } } } // "Запрашиватель" (requester), который запрашивает работу и ждет ее выполнения async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 { let (tx, rx) = oneshot::channel(); work_queue .send(Work { input, respond_on: tx }) .await .expect("провал отправки работы в очередь"); rx.await.expect("провал ожидания ответа") } #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(10); spawn(worker(rx)); for i in 0..100 { let resp = do_work(&tx, i).await; println!("результат работы для итерации {i}: {resp}"); } }
Ремарки:
- в примере вы могли распознать шаблон актора (actor pattern). Акторы, как правило, вызывают
select!в цикле - это обобщение нескольких предыдущих уроков, так что не торопитесь
- добавьте
_ = sleep(Duration::from_millis(100)) => { println!(..) }вselect!. Это никогда не выполнится. Почему? - теперь добавьте
timeout_fut, содержащий этот фьючер за пределамиloop:
- добавьте
let mut timeout_fut = sleep(Duration::from_millis(100)); loop { select! { .., _ = timeout_fut => { println!(..); }, } }
- это также не будет работать. Изучите ошибки компилятора, добавьте
&mutвtimeout_futвselect!для решения проблемы перемещения, затем используйтеBox::pin:
let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100))); loop { select! { .., _ = &mut timeout_fut => { println!(..); }, } }
- это компилируется, но по истечении тайм-аута на каждой итерации происходит
Poll::Ready(для решения этой проблемы может помочь объединенный фьючер). Обновите код, чтобы сбрасыватьtimeout_futкаждый раз, когда он истекает
-
Boxвыделяет память в куче. В некоторых случаяхstd::pin::pin!— это тоже вариант, но его сложно использовать для фьючера, которой переназначается - другая альтернатива — вообще не использовать
pin, а создать другую задачу, которая будет отправляться в каналoneshotкаждые 100 мс - данные, содержащие указатели на себя, называются самоссылающимися (self-referential). Обычно средство проверки заимствований (borrow checker) в
Rustпредотвращает перемещение таких данных, поскольку ссылки не могут жить дольше данных, на которые они указывают. Однако преобразование кода для асинхронных блоков и функций не проверяется средством проверки заимствований -
Pin— это обертка над ссылкой. Объект не может перемещаться с помощью закрепленного указателя. Однако он может перемещаться с помощью незакрепленного указателя - метод
pollтрейтаFutureиспользуетPin<&mut Self>вместо&mut Selfдля ссылки на экземпляр. Вот почему он может вызываться только на закрепленном указателе
-
Асинхронные трейты
Асинхронные методы в трейтах пока не поддерживаются в стабильной версии
Rust.
Крейт async_trait предоставляет макрос для решения этой задачи:
use async_trait::async_trait; use std::time::Instant; use tokio::time::{sleep, Duration}; #[async_trait] trait Sleeper { async fn sleep(&self); } struct FixedSleeper { sleep_ms: u64, } #[async_trait] impl Sleeper for FixedSleeper { async fn sleep(&self) { sleep(Duration::from_millis(self.sleep_ms)).await; } } async fn run_all_sleepers_multiple_times( sleepers: Vec<Box<dyn Sleeper>>, n_times: usize, ) { for _ in 0..n_times { println!("running all sleepers.."); for sleeper in &sleepers { let start = Instant::now(); sleeper.sleep().await; println!("slept for {}ms", start.elapsed().as_millis()); } } } #[tokio::main] async fn main() { let sleepers: Vec<Box<dyn Sleeper>> = vec![ Box::new(FixedSleeper { sleep_ms: 50 }), Box::new(FixedSleeper { sleep_ms: 100 }), ]; run_all_sleepers_multiple_times(sleepers, 5).await; }
Ремарки:
-
async_traitпрост в использовании, но учтите, что для работы он использует выделение памяти в куче. Это влечет издержки производительности - попробуйте создать новую "спящую" структуру, которая будет спать случайное время, и добавить ее в вектор
Отмена
Удаление фьючера означает, что его больше никогда нельзя будет опросить. Это называется отменой (cancellation) и может произойти в любой момент ожидания. Необходимо позаботиться о том, чтобы система работала правильно даже в случае отмены фьючера. Например, он не должен блокироваться или терять данные.
use std::io::{self, ErrorKind}; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; struct LinesReader { stream: DuplexStream, } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream } } async fn next(&mut self) -> io::Result<Option<String>> { let mut bytes = Vec::new(); let mut buf = [0]; while self.stream.read(&mut buf[..]).await? != 0 { bytes.push(buf[0]); if buf[0] == b'\n' { break; } } if bytes.is_empty() { return Ok(None); } let s = String::from_utf8(bytes) .map_err(|_| io::Error::new(ErrorKind::InvalidData, "не UTF-8"))?; Ok(Some(s)) } } async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> { for b in source.bytes() { dest.write_u8(b).await?; tokio::time::sleep(Duration::from_millis(10)).await } Ok(()) } #[tokio::main] async fn main() -> std::io::Result<()> { let (client, server) = tokio::io::duplex(5); let handle = tokio::spawn(slow_copy("привет\nпривет\n".to_owned(), client)); let mut lines = LinesReader::new(server); let mut interval = tokio::time::interval(Duration::from_millis(60)); loop { tokio::select! { _ = interval.tick() => println!("тик!"), line = lines.next() => if let Some(l) = line? { print!("{}", l) } else { break }, } } handle.await.unwrap()?; Ok(()) }
Ремарки:
- компилятор не помогает с обеспечением безопасности отмены. Необходимо читать документацию API и понимать, каким состоянием владеет ваша
async fn - в отличие от
panic!и?, отмена — это часть нормального управления потоком выполнения (а не обработка ошибок) - в примере теряется часть строки
- если ветвь
tick()выполняется первой,next()и егоbufуничтожаются -
LinesReaderможно сделать безопасным для отмены путем включенияbufв структуру:
- если ветвь
struct LinesReader { stream: DuplexStream, bytes: Vec<u8>, buf: [u8; 1], } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream, bytes: Vec::new(), buf: [0] } } async fn next(&mut self) -> io::Result<Option<String>> { // ... let raw = std::mem::take(&mut self.bytes); let s = String::from_utf8(raw) // ... } }
- Interval::tick безопасен для отмены, поскольку он отслеживает, был ли "доставлен" (delivered) тик
- AsyncReadExt::read безопасен для отмены, поскольку он либо возвращается, либо не читает данные
- AsyncBufReadExt::read_line, как и пример, не является безопасным для отмены. Подробности и альтернативы см. в документации
Упражнения
Для тренировки навыков работы с асинхронным
Rust, есть еще два упражнения:
- обедающие философы — на этот раз вам нужно решить эту задачу с помощью асинхронного
Rust - приложение для чата
Обедающие философы
use std::sync::Arc; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::Mutex; use tokio::time; struct Fork; struct Philosopher { name: String, // left_fork: ... // right_fork: ... // thoughts: ... } impl Philosopher { async fn think(&self) { self.thoughts .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name)) .await .unwrap(); } async fn eat(&self) { // Пытаемся до тех пор, пока не получим обе вилки println!("{} ест...", &self.name); time::sleep(time::Duration::from_millis(5)).await; } } static PHILOSOPHERS: &[&str] = &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"]; #[tokio::main] async fn main() { // Создаем вилки // Создаем философов // Каждый философ размышляет и ест 100 раз // Выводим размышления философов }
Для работы с асинхронным
Rustрекомендуется использоватьtokio:
[package] name = "dining-philosophers-async" version = "0.1.0" edition = "2021" [dependencies] tokio = { version = "1.26.0", features = ["sync", "time", "macros", "rt-multi-thread"] }
Подсказка: на этот раз вам придется использовать
Mutexи модульmpscизtokio.
Решениеuse std::sync::Arc; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::Mutex; use tokio::time; struct Fork; struct Philosopher { name: String, left_fork: Arc<Mutex<Fork>>, right_fork: Arc<Mutex<Fork>>, thoughts: Sender<String>, } impl Philosopher { async fn think(&self) { self.thoughts .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name)) .await .unwrap(); } async fn eat(&self) { // Пытаемся до тех пор, пока не получим обе вилки let (_left_fork, _right_fork) = loop { // Берем вилки... let left_fork = self.left_fork.try_lock(); let right_fork = self.right_fork.try_lock(); let Ok(left_fork) = left_fork else { // Если мы не получили левую вилку, удаляем правую вилку, // если она у нас была, позволяя выполняться другим задачам drop(right_fork); time::sleep(time::Duration::from_millis(1)).await; continue; }; let Ok(right_fork) = right_fork else { // Если мы не получили правую вилку, удаляем левую вилку, // если она у нас была, позволяя выполняться другим задачам drop(left_fork); time::sleep(time::Duration::from_millis(1)).await; continue; }; break (left_fork, right_fork); }; println!("{} ест...", &self.name); time::sleep(time::Duration::from_millis(5)).await; // Блокировки уничтожаются здесь } } static PHILOSOPHERS: &[&str] = &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"]; #[tokio::main] async fn main() { // Создаем вилки let mut forks = vec![]; (0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork)))); // Создаем философов let (philosophers, mut rx) = { let mut philosophers = vec![]; let (tx, rx) = mpsc::channel(10); for (i, name) in PHILOSOPHERS.iter().enumerate() { let left_fork = Arc::clone(&forks[i]); let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]); philosophers.push(Philosopher { name: name.to_string(), left_fork, right_fork, thoughts: tx.clone(), }); } (philosophers, rx) // `tx` уничтожается здесь, поэтому нам не нужно явно удалять его позже }; // Каждый философ думает и ест 100 раз for phil in philosophers { tokio::spawn(async move { for _ in 0..100 { phil.think().await; phil.eat().await; } }); } // Выводим размышления философов while let Some(thought) = rx.recv().await { println!("{thought}"); } }
Чат
В этом упражнении мы используем новые знания для разработки приложения чата. У нас есть сервер, к которому подключаются клиенты и в котором они публикуют свои сообщения. Клиент читает пользовательские сообщения через стандартный ввод и отправляет их на сервер. Сервер передает (broadcast) сообщение всем клиентам.
Для реализации этого функционала мы будем использовать широковещательный канал на сервере и tokio_websockets для взаимодействия между клиентом и сервером.
Создайте новый проект и добавьте следующие зависимости в
Cargo.toml:
[package] name = "chat-async" version = "0.1.0" edition = "2021" [dependencies] futures-util = { version = "0.3.30", features = ["sink"] } http = "1.0.0" tokio = { version = "1.28.1", features = ["full"] } tokio-websockets = { version = "0.5.1", features = ["client", "fastrand", "server", "sha1_smol"] }
Необходимые API
Вам потребуются следующие функции из
tokioиtokio_websockets. Потратьте несколько минут для ознакомления со следующими API:
-
StreamExt::next(), реализуемый
WebSocketStream— для асинхронного чтения сообщений из потока веб-сокетов -
SinkExt::send(), реализуемый
WebSocketStream— для асинхронной отправки сообщений в поток веб-сокетов - Lines::next_line() — для асинхронного чтения сообщений пользователя через стандартный ввод
- Sender::subscribe() — для подписки на широковещательный канал
Два бинарника
Как правило, в проекте может быть только один исполняемый файл (binary) и один файл
src/main.rs. Нам требуется два бинарника. Один для клиента и еще один для сервера. Теоретически их можно сделать двумя отдельными проектами, но мы поместим оба бинарника в один проект. Для того, чтобы это работало, клиент и сервер должны находиться в директорииsrc/bin(см. документацию).
Скопируйте следующий код сервера и клиента в
src/bin/server.rsиsrc/bin/client.rs, соответственно.
// src/bin/server.rs use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use std::error::Error; use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::broadcast::{channel, Sender}; use tokio_websockets::{Message, ServerBuilder, WebSocketStream}; async fn handle_connection( addr: SocketAddr, mut ws_stream: WebSocketStream<TcpStream>, bcast_tx: Sender<String>, ) -> Result<(), Box<dyn Error + Send + Sync>> { todo!("реализуй меня") } #[tokio::main] async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let (bcast_tx, _) = channel(16); let listener = TcpListener::bind("127.0.0.1:2000").await?; println!("Запросы принимаются на порту 2000"); loop { let (socket, addr) = listener.accept().await?; println!("Запрос от {addr:?}"); let bcast_tx = bcast_tx.clone(); tokio::spawn(async move { // Оборачиваем сырой поток TCP в веб-сокет let ws_stream = ServerBuilder::new().accept(socket).await?; handle_connection(addr, ws_stream, bcast_tx).await }); } }
// src/bin/client.rs use futures_util::stream::StreamExt; use futures_util::SinkExt; use http::Uri; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_websockets::{ClientBuilder, Message}; #[tokio::main] async fn main() -> Result<(), tokio_websockets::Error> { let (mut ws_stream, _) = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")) .connect() .await?; let stdin = tokio::io::stdin(); let mut stdin = BufReader::new(stdin).lines(); todo!("реализуй меня") }
Запуск бинарников
Команда для запуска сервера:
cargo run --bin server
Команда для запуска клиента:
cargo run --bin client
Задачи
- реализовать функцию
handle_connectionвsrc/bin/server.rs
- подсказка: используйте
tokio::select!для параллельного выполнения двух задач в бесконечном цикле. Одна задача получает сообщения от клиента и передает их другим клиентам. Другая — отправляет клиенту сообщения, полученные от сервера
- подсказка: используйте
- завершите функцию
mainвsrc/bin/client.rs
- подсказка: также используйте
tokio::select!в бесконечном цикле для параллельного выполнения двух задач: 1) чтение сообщений пользователя из стандартного ввода и их отправка серверу; 2) получение сообщений от сервера и их отображение
- подсказка: также используйте
- опционально: измените код для передачи сообщений всем клиентам, кроме отправителя
Решение// src/bin/server.rs use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use std::error::Error; use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::broadcast::{channel, Sender}; use tokio_websockets::{Message, ServerBuilder, WebSocketStream}; async fn handle_connection( addr: SocketAddr, mut ws_stream: WebSocketStream<TcpStream>, bcast_tx: Sender<String>, ) -> Result<(), Box<dyn Error + Send + Sync>> { ws_stream .send(Message::text("Добро пожаловать в чат! Отправьте сообщение".to_string())) .await?; let mut bcast_rx = bcast_tx.subscribe(); // Бесконечный цикл для параллельного выполнения двух задач: // 1) получение сообщений из `ws_stream` и их передача клиентам // 2) получение сообщений в `bcast_rx` и их отправка клиенту loop { tokio::select! { incoming = ws_stream.next() => { match incoming { Some(Ok(msg)) => { if let Some(text) = msg.as_text() { println!("{addr:?}: {text:?}"); bcast_tx.send(text.into())?; } } Some(Err(err)) => return Err(err.into()), None => return Ok(()), } } msg = bcast_rx.recv() => { ws_stream.send(Message::text(msg?)).await?; } } } } #[tokio::main] async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let (bcast_tx, _) = channel(16); let listener = TcpListener::bind("127.0.0.1:2000").await?; println!("Запросы принимаются на порту 2000"); loop { let (socket, addr) = listener.accept().await?; println!("Запрос от {addr:?}"); let bcast_tx = bcast_tx.clone(); tokio::spawn(async move { // Оборачиваем сырой поток TCP в веб-сокет let ws_stream = ServerBuilder::new().accept(socket).await?; handle_connection(addr, ws_stream, bcast_tx).await }); } }
// src/bin/client.rs use futures_util::stream::StreamExt; use futures_util::SinkExt; use http::Uri; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_websockets::{ClientBuilder, Message}; #[tokio::main] async fn main() -> Result<(), tokio_websockets::Error> { let (mut ws_stream, _) = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")) .connect() .await?; let stdin = tokio::io::stdin(); let mut stdin = BufReader::new(stdin).lines(); // Бесконечный цикл для параллельной отправки и получения сообщений loop { tokio::select! { incoming = ws_stream.next() => { match incoming { Some(Ok(msg)) => { if let Some(text) = msg.as_text() { println!("От сервера: {}", text); } }, Some(Err(err)) => return Err(err.into()), None => return Ok(()), } } res = stdin.next_line() => { match res { Ok(None) => return Ok(()), Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?, Err(err) => return Err(err.into()), } } } } }
Это конец бонусной части руководства.
Материалы для более глубокого изучения рассмотренных тем:
- Книга/учебник по Rust (на русском языке) — главы 16 и 20
- rustlings — упражнение 20
- Rust на примерах (на русском языке) — примеры 20.1 и 20.2
Happy coding!
-
Комментарии (10)

domix32
28.03.2024 10:41+4фьючерса
Фьючерсы - это термин из финансов. В расте они фьючи или футуры aka Future<T>.

hrls
28.03.2024 10:41+1вызов
send()блокирует текущий поток до тех пор, пока в канале имеется место для новых сообщенийРазве не наоборот?
Unbounded в случае очередей будет логичнее назвать безразмерной.

Sabirman
Как в Rust-е сделать объектную динамически-подключаемую библиотеку.
На Java, C#, Python и т.п. объекты стандартизированы. Т.е. ты в своем классе реализуешь какой-то интерфейс и подключаешь свой плагин к основной программе.
В Rust-е, я так понимаю, можно реализовывать только Си-шные функции. И вся прелесть раста при этом теряется.
domix32
Если хочется сделать доступ через C ABI, то делаем тип либы dylib, выключаем манглинг функций для нашего интерфейса и приводим свой API к сишным сигнатурам. Собственный отдельный ржавый FFI уже много лет в разработке, но и работы там непочатый край ибо никто не осилил пока оформить какой-то пропозал, удовлетворяющий эту хотелку. В некотором экспериментальном виде для WASM можно использовать interface types, так что в теории если ваш wasm рантайм их поддерживает, то можно делать через него подключаться.
В остальном не очень понятно какие ещё вам варианты нужны? Общение между языками традиционно происходит через C ABI происходит, хоть в python, хоть в go, хоть в java и пока что лучше ничего не придумали. Есть отдельные крейты, позволяющие работать с этими языками на уровне объектов этих языков - например, pyo3 для питона или robusta для JNI.
Sabirman
Попробую по другому объяснить. Есть внешняя, динамически-подключаемая, библиотека, которая тоже написана на Rust. Но, несмотря на то, что она написана на Rust-е, я должен вызывать её методы в Unsafe-блоках. В Java, C#, Python и т.д. в аналогичной ситуации внешний вызов не отличается от вызова внутренней функции и соблюдаются все защиты, заложенные в языке.
А большие приложения никогда не идут одним исполняемым файлом, т.е. Rust не готов к использованию в больших приложениях.
Или я ошибаюсь?
domix32
Судя по докам и обсуждениям динамические либы доступны только через С API, поэтому да, даже если она на Rust, она будет считаться за либу на другом языке и необходимы врапперы поверх unsafe кода. WASM+WASI модули в теории могут позволить избегать этого, но принципиально ситуация не меняется.
Не вижу принципиальной разницы, если б всё писалось на том же C/С++. Оно буквально работает "как у всех". С тем же успехом можно сказать, что и С/С++ не готовы к использованию в больших приложениях.
эти языки по-умолчанию имеют некоторый рантайм с некоторым окружением и пространством имён, в который они могут импортировать внешние модули. Они не имеют гарантий безопасности аналогичных Rust и фактически превращаются в каст void* к сигнатуре, которую вы представили. В Rust вам необходимо представить какие-то гарантии относительно загружаемого модуля компилятору, т.к. нет никакого рантайма, поэтому да - unsafe и врапперы - ваш вариант. Благо их кажется можно генерировать автоматически через макросы.
sdramare
Они не имеют гарантий безопасности аналогичных Rust и фактически превращаются в каст void* к сигнатуре, которую вы представили.Я думаю человек выше больше подразумевал что конечные бинарники Java/C# содержат в себе метаинформацию(рефлексию) о всех структурах/классах и методах внутри, так что их можно динамически подключать с гарантией что нужный класс реализует заданный интерфейс. В расте же при компиляции никакой метаинформации вроде того какие трейты для какой структуры реализованы не сохраняется, по-этому динамическая загрузка библиотеки всегда будет unsafe.
domix32
не совсем так. оно в том или ином виде будет ссылаться на некоторые виртуальные таблицы, которые можно отрефлексировать при желании, но из-за того язык компилируемый гарантии интерфейса намертво зашиваются в бинарь. У managed языков есть рантайм, который позволяет его расширять и работать с ним как будто рабочее окружение было таким всегда.
redfox0
https://stackoverflow.com/a/75903553
Другое дело, что у раста ABI пока ещё (?) не стабилизировано, так что динамические библиотеки и код придётся собирать одинаковой версией компилятора.
Но проблемы с unsafe-блоками не вижу, если у вас есть исходники библиотеки.
DarkEld3r
abi_stable