Hello world!


Представляю вашему вниманию бонусную часть практического руководства по Rust.



Другой формат, который может показаться вам более удобным.


Руководство основано на Comprehensive Rust — руководстве по Rust от команды Android в Google и рассчитано на людей, которые уверенно владеют любым современным языком программирования. Еще раз: это руководство не рассчитано на тех, кто только начинает кодить ?


В этой части мы поговорим о параллельном (concurrent) и асинхронном (async) Rust.


Материалы для более глубокого изучения названных тем:



Также см. Большую шпаргалку по Rust.


Параллельный Rust


Rust полностью поддерживает параллелизм (concurrency) с использованием потоков ОС с мьютексами (mutexes) и каналами (channels).


Система типов Rust играет важную роль в том, что многие ошибки параллелизма становятся ошибками времени компиляции. Это часто называют бесстрашным параллелизмом (fearless concurrency), поскольку мы можем положиться на компилятор, который обеспечивает правильную обработку параллелизма во время выполнения.


Потоки


Потоки (threads) Rust работают аналогично потокам в других языках:


use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("значение счетчика в выделенном потоке: {i}!");
            thread::sleep(Duration::from_millis(5));
        }
    });

    for i in 1..5 {
        println!("значение счетчика в основном потоке: {i}");
        thread::sleep(Duration::from_millis(5));
    }
}

  • потоки являются потоками демона (daemon threads), основной поток не ждет их выполнения
  • потоки паникуют независимо друг от друга
    • паника может содержать полезную нагрузку (payload), которую можно извлечь с помощью downcast_ref

Ремарки:


  • обратите внимание, что основной поток не ждет выполнения выделенных (spawned) потоков
  • для ожидания выполнения потока следует использовать let handle = thread::spawn() и затем handle.join()
  • вызовите панику в потоке. Обратите внимание, что это не влияет на main
  • используйте Result, возвращаемый из handle.join(), для доступа к полезной нагрузке паники. В этом может помочь Any

Потоки с ограниченной областью видимости


Обычные потоки не могут заимствовать значения из окружения:


use std::thread;

fn foo() {
    let s = String::from("привет");
    thread::spawn(|| {
        println!("длина: {}", s.len());
    });
}

fn main() {
    foo();
}

Однако для этого можно использовать scoped threads (потоки с ограниченной областью видимости):


use std::thread;

fn main() {
    let s = String::from("привет");

    thread::scope(|scope| {
        scope.spawn(|| {
            println!("длина: {}", s.len());
        });
    });
}

Ремарки:


  • когда функция thread::scope завершается, все потоки гарантированно объединяются, поэтому они могут вернуть заимствованные данные
  • применяются обычные правила заимствования Rust: мы можем заимствовать значение мутабельно в одном потоке, или иммутабельно в любом количестве потоков

Каналы


Каналы (channels) Rust состоят из двух частей: Sender<T> (отправитель/передатчик) и Receiver<T> (получатель/приемник). Они соединяются с помощью канала, но мы видим только конечные точки:


use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    tx.send(10).unwrap();
    tx.send(20).unwrap();

    println!("Получено: {:?}", rx.recv());
    println!("Получено: {:?}", rx.recv());

    let tx2 = tx.clone();
    tx2.send(30).unwrap();
    println!("Получено: {:?}", rx.recv());
}

  • mpsc означает Multi-Producer, Single-Consumer (несколько производителей, один потребитель). Sender и SyncSender реализуют Clone (поэтому мы можем создать несколько производителей), а Receiver не реализует (поэтому у нас может быть только один потребитель)
  • send() и recv() возвращают Result. Если они возвращают Err, значит соответствующий Sender или Receiver уничтожен (dropped) и канал закрыт

Несвязанные каналы


mpsc::channel() возвращает несвязанный (unbounded) и асинхронный канал:


use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let thread_id = thread::current().id();
        for i in 1..10 {
            tx.send(format!("Сообщение {i}")).unwrap();
            println!("{thread_id:?}: отправил сообщение {i}");
        }
        println!("{thread_id:?}: готово");
    });
    thread::sleep(Duration::from_millis(100));

    for msg in rx.iter() {
        println!("Основной поток: получено {msg}");
    }
}

Связанные каналы


send() связанного (bounded) синхронного канала блокирует текущий поток:


use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::sync_channel(3);

    thread::spawn(move || {
        let thread_id = thread::current().id();
        for i in 1..10 {
            tx.send(format!("Сообщение {i}")).unwrap();
            println!("{thread_id:?}: отправил сообщение {i}");
        }
        println!("{thread_id:?}: готово");
    });
    thread::sleep(Duration::from_millis(100));

    for msg in rx.iter() {
        println!("Основной поток: получено {msg}");
    }
}

Ремарки:


  • вызов send() блокирует текущий поток до тех пор, пока в канале имеется место для новых сообщений. Поток может блокироваться бесконечно, если отсутствует получатель
  • вызов send() заканчивается ошибкой (поэтому возвращается Result), если канал закрыт. Канал закрывается после уничтожения получателя
  • связанный канал с нулевым размером называется "rendezvous channel". Каждый вызов send() блокирует текущий поток, пока другой поток не вызовет read()

Send и Sync


Откуда Rust знает о необходимости запрета доступа к общему (shared) состоянию из нескольких потоков? Ответ кроется в двух трейтах:


  • Send: тип T является Send, если передача T в другой поток является безопасной
  • Sync: тип T является Sync, если передача &T в другой поток является безопасной

Send и Sync являются небезопасными трейтами. Компилятор автоматически реализует их для наших типов при условии, что они содержат только типы Send и Sync. Эти типы можно реализовать самостоятельно, если мы уверены в валидности наших типов.


Ремарки:


  • о типах Send и Sync можно думать как о маркерах того, что тип содержит несколько безопасных с точки зрения потоков (thread-safety) свойств
  • эти типы могут использоваться в качестве общих ограничений по аналогии с обычными трейтами

Send


Тип T является Send, если передача значения T в другой поток является безопасной.


Эффект перемещения владения в другой поток заключается в том, что деструкторы T запускаются в этом новом потоке. Поэтому вопрос состоит в том, когда мы можем выделить значение в одном потоке и освободить его в другом потоке.


Например, подключение к SQLite доступно только в одном потоке.


Sync


Тип T является Sync, если одновременный доступ к значению T из нескольких потоков является безопасным.


Если быть более точным, определение гласит следующее: T является Sync, если и только если &T является Send.


Это означает, что если тип является "потокобезопасным" (thread-safe) для совместного использования, он также потокобезопасен для передачи ссылок между потоками.


Если тип является Sync, его можно использовать в нескольких потоках без риска возникновения гонок за данными (data race) и других проблем с синхронизацией, поэтому его можно безопасно перемещать в другой поток. Ссылка на тип также может безопасно перемещаться в другой поток, поскольку доступ к данным, на которые она ссылается, из любого потока является безопасным.


Примеры


Send + Sync


Большинство типов является Send + Sync:


  • i8, f32, bool, char, &str etc.
  • (T1, T2), [T; N], &[T], struct { x: T } etc.
  • String, Option<T>, Vec<T>, Box<T> etc.
  • Arc<T> — явно потокобезопасный благодаря атомарному подсчету ссылок
  • Mutex<T> — явно потокобезопасный благодаря внутренней блокировке
  • AtomicBool, AtomicU8 etc., где используются специальные атомарные инструкции

Дженерики обычно являются Send + Sync, когда таковыми являются параметры типов.


Send + !Sync


Эти типы могут перемещаться в другие потоки, но не являются потокобезопасными. Обычно это связано с их внутренней изменчивостью:


  • mpsc::Sender<T>
  • mpsc::Receiver<T>
  • Cell<T>
  • RefCell<T>

!Send + Sync


Эти типы являются потокобезопасными, но не могут перемещаться в другие потоки:


  • MutexGuard<T: Sync> — использует примитивы уровня ОС, которые должны быть освобождены в создавшем их потоке

!Send + !Sync


Эти типы не являются потокобезопасными и не могут перемещаться в другой поток:


  • Rc<T> — каждый Rc<T> содержит ссылку на RcBox<T>, который содержит неатомарный счетчик ссылок
  • *const T, *mut TRust предполагает, что сырые указатели могут иметь нюансы, связанные с их параллельным выполнением

Общее состояние


Rust использует систему типов для обеспечения синхронизации общих (shared) данных. Это делается в основном с помощью двух типов:


  • Arc — атомарный счетчик ссылок на T: обрабатывает передачу между потоками и освобождает T при уничтожении последней ссылки на нее
    Mutex — обеспечивает взаимоисключающий доступ к значению T

    Arc


    Arc<T> предоставляет общий доступ только для чтения к T через Arc::clone():


    use std::sync::Arc;
    use std::thread;
    
    fn main() {
        let v = Arc::new(vec![10, 20, 30]);
        let mut handles = Vec::new();
        for _ in 1..5 {
            let v = Arc::clone(&v);
            handles.push(thread::spawn(move || {
                let thread_id = thread::current().id();
                println!("{thread_id:?}: {v:?}");
            }));
        }
    
        handles.into_iter().for_each(|h| h.join().unwrap());
        println!("v: {v:?}");
    }

    Ремарки:


    • Arc означает Atomic Reference Counter (атомарный счетчик ссылок) и является потокобезопасной версией Rc, в которой используются атомарные операции
    • Arc<T> реализует Clone независимо от того, делает ли это T. Он реализует Send и Sync, только если T их реализует
    • Arc::clone() имеет некоторую цену за счет выполнения атомарных операций, но после этого использование T является бесплатным
    • остерегайтесь ссылочных циклов, Arc не использует сборщик мусора для их обнаружения
      • в этом может помочь std::sync::Weak

    Mutex


    Mutex<T> обеспечивает взаимное исключение и предоставляет мутабельный доступ к T через доступный только для чтения интерфейс (форма внутренней изменчивости):


    use std::sync::Mutex;
    
    fn main() {
        let v = Mutex::new(vec![10, 20, 30]);
        println!("v: {:?}", v.lock().unwrap());
    
        {
            let mut guard = v.lock().unwrap();
            guard.push(40);
        }
    
        println!("v: {:?}", v.lock().unwrap());
    }

    Обратите внимание на неявную реализацию (impl<T: Send> Sync for Mutex<T>)https://doc.rust-lang.org/std/sync/struct.Mutex.html#impl-Sync-for-Mutex%3CT%3E.


    Ремарки:


    • Mutex в Rust похож на коллекцию, состоящую из одного элемента — защищенных данных
      • невозможно забыть получить (acquire) мьютекс перед доступом к защищенным данным
    • из &Mutex<T> через блокировку (lock) можно получить &mut T. MutexGuard гарантирует, что &mut T не живет дольше удерживаемой (held) блокировки
    • Mutex<T> реализует Send и Sync, только если T реализует Send
    • RwLock является блокировкой, доступной как для чтения, так и для записи
    • почему lock() возвращает Result?
      • Если поток, в котором находится мьютекс, паникует, мьютекс становится "отравленным" (poisoned), сигнализируя о том, что защищенные данные могут находиться в несогласованном состоянии. Вызов lock() на отравленном мьютексе проваливается с PoisonError. Для восстановления данных можно вызвать into_iter() на ошибке

    Пример


    Рассмотрим Arc и Mutex в действии:


    use std::thread;
    // use std::sync::{Arc, Mutex};
    
    fn main() {
        let v = vec![10, 20, 30];
        let handle = thread::spawn(|| {
            v.push(10);
        });
        v.push(1000);
    
        handle.join().unwrap();
        println!("v: {v:?}");
    }

    Возможное решение:


    use std::sync::{Arc, Mutex};
    use std::thread;
    
    fn main() {
        let v = Arc::new(Mutex::new(vec![10, 20, 30]));
    
        let v2 = Arc::clone(&v);
        let handle = thread::spawn(move || {
            let mut v2 = v2.lock().unwrap();
            v2.push(10);
        });
    
        {
            let mut v = v.lock().unwrap();
            v.push(1000);
        }
    
        handle.join().unwrap();
    
        println!("v: {v:?}");
    }

    Ремарки:


    • v обернут в Arc и Mutex, поскольку их зоны ответственности ортогональны
      • оборачивание Mutex в Arc является распространенным паттерном для передачи мутабельного состояния между потоками
    • v: Arc<_> должен быть клонирован как v2 для передачи в другой поток. Обратите внимание на move в сигнатуре лямбды
    • блоки предназначены для максимального сужения области видимости LockGuard

    Упражнения


    Попрактикуемся применять новые знания на двух упражнениях:



    Обедающие философы


    Условия задачи:


    Пять безмолвных философов сидят вокруг круглого стола, перед каждым философом стоит тарелка спагетти. На столе между каждой парой ближайших философов лежит по одной вилке.


    Каждый философ может либо есть, либо размышлять. Прием пищи не ограничен количеством оставшихся спагетти — подразумевается бесконечный запас. Тем не менее, философ может есть только тогда, когда держит две вилки — взятую справа и слева.


    Каждый философ может взять ближайшую вилку (если она доступна) или положить — если он уже держит ее. Взятие каждой вилки и возвращение ее на стол являются раздельными действиями, которые должны выполняться одно за другим.


    Задача заключается в том, чтобы разработать модель (параллельный алгоритм), при которой ни один из философов не будет голодать, то есть будет чередовать прием пищи и размышления 100 раз.


    use std::sync::{mpsc, Arc, Mutex};
    use std::thread;
    use std::time::Duration;
    
    struct Fork;
    
    struct Philosopher {
        name: String,
        // left_fork: ...
        // right_fork: ...
        // thoughts: ...
    }
    
    impl Philosopher {
        fn think(&self) {
            self.thoughts
                .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name))
                .unwrap();
        }
    
        fn eat(&self) {
            // Берем вилки
            println!("{} ест...", &self.name);
            thread::sleep(Duration::from_millis(10));
        }
    }
    
    static PHILOSOPHERS: &[&str] =
        &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"];
    
    fn main() {
        // Создаем вилки
    
        // Создаем философов
    
        // Каждый философ размышляет и ест 100 раз
    
        // Выводим размышления философов
    }

    Подсказка: рассмотрите возможность использования std::mem::swap для решения проблемы взаимной блокировки (deadlock).


    Решение
    use std::sync::{mpsc, Arc, Mutex};
    use std::thread;
    use std::time::Duration;
    
    struct Fork;
    
    struct Philosopher {
        name: String,
        left_fork: Arc<Mutex<Fork>>,
        right_fork: Arc<Mutex<Fork>>,
        thoughts: mpsc::SyncSender<String>,
    }
    
    impl Philosopher {
        fn think(&self) {
            self.thoughts
                .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name))
                .unwrap();
        }
    
        fn eat(&self) {
            println!("{} пытается есть", &self.name);
    
            let _left = self.left_fork.lock().unwrap();
            let _right = self.right_fork.lock().unwrap();
    
            println!("{} ест...", &self.name);
    
            thread::sleep(Duration::from_millis(10));
        }
    }
    
    static PHILOSOPHERS: &[&str] =
        &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"];
    
    fn main() {
        let (tx, rx) = mpsc::sync_channel(10);
    
        let forks = (0..PHILOSOPHERS.len())
            .map(|_| Arc::new(Mutex::new(Fork)))
            .collect::<Vec<_>>();
    
        for i in 0..forks.len() {
            let tx = tx.clone();
    
            let mut left_fork = Arc::clone(&forks[i]);
            let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]);
    
            // Во избежание взаимной блокировки нам необходимо где-то нарушить симметрию.
            // Меняем вилки местами без их повторной инициализации
            if i == forks.len() - 1 {
                std::mem::swap(&mut left_fork, &mut right_fork);
            }
    
            let philosopher = Philosopher {
                name: PHILOSOPHERS[i].to_string(),
                thoughts: tx,
                left_fork,
                right_fork,
            };
    
            thread::spawn(move || {
                for _ in 0..100 {
                    philosopher.eat();
                    philosopher.think();
                }
            });
        }
    
        drop(tx);
    
        for thought in rx {
            println!("{thought}");
        }
    }

    Многопоточная проверка ссылок


    Создадим инструмент для многопоточной проверки ссылок. Он должен начинать с основной веб-страницы и проверять корректность ссылок на ней. Затем он должен рекурсивно проверять другие страницы в том же домене и продолжать делать это до тех пор, пока все страницы не будут проверены.


    Для создания такого инструмента вам потребуется какой-нибудь клиент HTTP, например, reqwest:


    cargo add reqwest --features blocking,rustls-tls

    Для обнаружения ссылок можно воспользоваться scraper:


    cargo add scraper

    Наконец, для обработки ошибок пригодится thiserror:


    cargo add thiserror

    Cargo.toml:


    [package]
    name = "link-checker"
    version = "0.1.0"
    edition = "2021"
    publish = false
    
    [dependencies]
    reqwest = { version = "0.11.12", features = ["blocking", "rustls-tls"] }
    scraper = "0.13.0"
    thiserror = "1.0.37"

    Начните с небольшого сайта, такого как https://www.google.org.


    src/main.rs:


    use reqwest::blocking::Client;
    use reqwest::Url;
    use scraper::{Html, Selector};
    use thiserror::Error;
    
    #[derive(Error, Debug)]
    enum Error {
        #[error("ошибка запроса: {0}")]
        ReqwestError(#[from] reqwest::Error),
        #[error("плохой ответ HTTP: {0}")]
        BadResponse(String),
    }
    
    #[derive(Debug)]
    struct CrawlCommand {
        url: Url,
        extract_links: bool,
    }
    
    fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
        println!("проверка {:#}", command.url);
    
        let response = client.get(command.url.clone()).send()?;
    
        if !response.status().is_success() {
            return Err(Error::BadResponse(response.status().to_string()));
        }
    
        let mut link_urls = Vec::new();
    
        if !command.extract_links {
            return Ok(link_urls);
        }
    
        let base_url = response.url().to_owned();
        let body_text = response.text()?;
        let document = Html::parse_document(&body_text);
    
        let selector = Selector::parse("a").unwrap();
        let href_values = document
            .select(&selector)
            .filter_map(|element| element.value().attr("href"));
        for href in href_values {
            match base_url.join(href) {
                Ok(link_url) => {
                    link_urls.push(link_url);
                }
                Err(err) => {
                    println!("в {base_url:#} не поддается разбору {href:?}: {err}");
                }
            }
        }
        Ok(link_urls)
    }
    
    fn main() {
        let client = Client::new();
        let start_url = Url::parse("https://www.google.org").unwrap();
        let crawl_command = CrawlCommand{ url: start_url, extract_links: true };
    
        match visit_page(&client, &crawl_command) {
            Ok(links) => println!("ссылки: {links:#?}"),
            Err(err) => println!("невозможно извлечь ссылки: {err:#}"),
        }
    }

    Задачи:


    • используйте потоки для параллельной проверки ссылок: отправьте проверяемые URL-адреса в канал и позвольте нескольким потокам проверять URL-адреса параллельно
    • реализуйте рекурсивное извлечение ссылок со всех страниц домена www.google.org. Установите верхний предел в 100 страниц или около того, чтобы сайт вас не заблокировал

    Решение
    use std::sync::{mpsc, Arc, Mutex};
    use std::thread;
    use std::collections::HashSet;
    
    use reqwest::blocking::Client;
    use reqwest::Url;
    use scraper::{Html, Selector};
    use thiserror::Error;
    
    #[derive(Error, Debug)]
    enum Error {
        #[error("ошибка запроса: {0}")]
        ReqwestError(#[from] reqwest::Error),
        #[error("плохой ответ HTTP: {0}")]
        BadResponse(String),
    }
    
    #[derive(Debug)]
    struct CrawlCommand {
        url: Url,
        extract_links: bool,
    }
    
    fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
        println!("проверка {:#}", command.url);
    
        let response = client.get(command.url.clone()).send()?;
    
        if !response.status().is_success() {
            return Err(Error::BadResponse(response.status().to_string()));
        }
    
        let mut link_urls = Vec::new();
    
        if !command.extract_links {
            return Ok(link_urls);
        }
    
        let base_url = response.url().to_owned();
        let body_text = response.text()?;
        let document = Html::parse_document(&body_text);
    
        let selector = Selector::parse("a").unwrap();
        let href_values = document
            .select(&selector)
            .filter_map(|element| element.value().attr("href"));
        for href in href_values {
            match base_url.join(href) {
                Ok(link_url) => {
                    link_urls.push(link_url);
                }
                Err(err) => {
                    println!("в {base_url:#} не поддается разбору {href:?}: {err}");
                }
            }
        }
        Ok(link_urls)
    }
    
    struct CrawlState {
        domain: String,
        visited_pages: HashSet<String>,
    }
    
    impl CrawlState {
        fn new(start_url: &Url) -> CrawlState {
            let mut visited_pages = HashSet::new();
    
            visited_pages.insert(start_url.as_str().to_string());
    
            CrawlState {
                domain: start_url.domain().unwrap().to_string(),
                visited_pages
            }
        }
    
        /// Определяет, должны ли извлекаться ссылки на указанной странице
        fn should_extract_links(&self, url: &Url) -> bool {
            let Some(url_domain) = url.domain() else {
                return false;
            };
            url_domain == self.domain
        }
    
        /// Помечает указанную страницу как посещенную,
        /// возвращает `false`, если страница уже посещалась
        fn mark_visited(&mut self, url: &Url) -> bool {
            self.visited_pages.insert(url.as_str().to_string())
        }
    }
    
    type CrawlResult = Result<Vec<Url>, (Url, Error)>;
    
    fn spawn_crawler_threads(
        command_receiver: mpsc::Receiver<CrawlCommand>,
        result_sender: mpsc::Sender<CrawlResult>,
        thread_count: u32,
    ) {
        let command_receiver = Arc::new(Mutex::new(command_receiver));
    
        for _ in 0..thread_count {
            let result_sender = result_sender.clone();
            let command_receiver = command_receiver.clone();
    
            thread::spawn(move || {
                let client = Client::new();
    
                loop {
                    let command_result = {
                        let receiver_guard = command_receiver.lock().unwrap();
                        receiver_guard.recv()
                    };
    
                    let Ok(crawl_command) = command_result else {
                        // Отправитель уничтожен, команд больше не будет
                        break;
                    };
    
                    let crawl_result = match visit_page(&client, &crawl_command) {
                        Ok(link_urls) => Ok(link_urls),
                        Err(error) => Err((crawl_command.url, error)),
                    };
    
                    result_sender.send(crawl_result).unwrap();
                }
            });
        }
    }
    
    fn control_crawl(
        start_url: Url,
        command_sender: mpsc::Sender<CrawlCommand>,
        result_receiver: mpsc::Receiver<CrawlResult>,
    ) -> Vec<Url> {
        let mut crawl_state = CrawlState::new(&start_url);
    
        let start_command = CrawlCommand { url: start_url, extract_links: true };
        command_sender.send(start_command).unwrap();
    
        let mut pending_urls = 1;
    
        let mut bad_urls = Vec::new();
    
        while pending_urls > 0 {
            let crawl_result = result_receiver.recv().unwrap();
            pending_urls -= 1;
    
            match crawl_result {
                Ok(link_urls) => {
                    for url in link_urls {
                        if crawl_state.mark_visited(&url) {
                            let extract_links = crawl_state.should_extract_links(&url);
                            let crawl_command = CrawlCommand { url, extract_links };
                            command_sender.send(crawl_command).unwrap();
                            pending_urls += 1;
                        }
                    }
                }
                Err((url, error)) => {
                    bad_urls.push(url);
                    println!("при извлечении ссылок возникла ошибка: {:#}", error);
                    continue;
                }
            }
        }
        bad_urls
    }
    
    fn check_links(start_url: Url) -> Vec<Url> {
        let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
        let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
        spawn_crawler_threads(command_receiver, result_sender, 16);
        control_crawl(start_url, command_sender, result_receiver)
    }
    
    fn main() {
        let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
        let bad_urls = check_links(start_url);
        println!("плохие URL: {:#?}", bad_urls);
    }

    Асинхронный Rust


    "Асинхронность" (async) — это модель параллелизма, в которой несколько задач выполняются одновременно. Каждая задача выполняется до тех пор, пока не завершится или не заблокируется, затем выполняется следующая (готовая к выполнению) задача и т.д. Такая модель позволяет выполнять большое количество задач с помощью небольшого числа потоков. Это связано с тем, что накладные расходы на выполнение каждой задачи обычно очень низкие, а операционные системы предоставляют примитивы для эффективного переключения между задачами.


    Асинхронные операции Rust основаны на фьючерах (futures, от "future" — будущее), представляющих собой работу, которая может быть завершена в будущем. Фьючеры "опрашиваются" (polled) до тех пор, пока не сообщат о завершении.


    Фьючеры опрашиваются асинхронной средой выполнения (async runtime). Доступно несколько таких сред. Одной из самых популярных является Tokio.


    Сравнения:


    • в Python используется похожая модель в asyncio. Однако, его тип Future основан на функциях обратного вызова (callbacks), а не на опросах. Асинхронные программы Python должны выполняться в цикле, как и асинхронные программы Rust
    • Promise в JavaScript похож на фьючер, но также основан на колбэках. Среды выполнения реализует цикл событий (event loop), поэтому многие детали разрешения промиса являются скрытыми

    Основы асинхронности


    async/await


    На высоком уровне асинхронный код Rust выглядит очень похоже на обычный синхронный код:


    use futures::executor::block_on;
    
    async fn count_to(count: i32) {
        for i in 1..=count {
            println!("Значение счетчика: {i}!");
        }
    }
    
    async fn async_main(count: i32) {
        count_to(count).await;
    }
    
    fn main() {
        block_on(async_main(10));
    }

    Ремарки:


    • это упрощенный пример для демонстрации синтаксиса. В нем отсутствуют долгие операции или параллелизм
    • какой тип возвращает асинхронная функция?
      • используйте let future: () = async_main(10); в main() и посмотрите
    • ключевое слово async — это синтаксический сахар. Компилятор заменяет возвращаемый тип фьючером
    • мы не можем сделать функцию main асинхронной без предоставления компилятору дополнительных инструкций о том, как использовать возвращаемый фьючер
    • для запуска асинхронного кода требуется исполнитель (executor). block_on() блокирует текущий поток до завершения фьючера
    • .await асинхронно ждет завершения другой операции. В отличие от block_on(), await не блокирует текущий поток
    • await может использоваться только внутри асинхронной функции (или блока, о чем мы поговорим позже)

    Фьючеры


    Future — это трейт, реализуемый объектами, представляющими операцию, которая пока не может быть завершена. Фьючеры могут опрашиваться, poll() возвращает Poll.


    use std::pin::Pin;
    use std::task::Context;
    
    pub trait Future {
        type Output;
    
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
    }
    
    pub enum Poll<T> {
        Ready(T),
        Pending,
    }

    Асинхронная функция возвращает impl Future. Также возможно (но редко применяется) реализовать Future для собственных типов. Например, JoinHandle, возвращаемый tokio::spawn реализует Future, что позволяет присоединяться (join) к нему.


    Ключевое слово await приостанавливает выполнение асинхронной функции, пока фьючер не будет готов.


    Ремарки:


    • типы Future и Poll реализованы в точности, как показано
    • мы не будем рассматривать Pin и Context, поскольку не будет создавать новые асинхронные примитивы. Коротко:
      • Context позволяет фьючеру планировать повторный опрос при возникновении события
      • Pin гарантирует, что фьючер не перемещается в памяти, поэтому ссылки в этом фьючере остаются валидными. Это необходимо, чтобы ссылки оставались валидными после await

    Среда выполнения


    Среда выполнения (runtime) предоставляет поддержку для асинхронного выполнения операций (reactor) и отвечает за выполнение фьючеров (executor). Rust не имеет "встроенной" среды выполнения, но доступно несколько вариантов:


    • Tokio — производительный, с хорошим набором инструментов, таких как Hyper для HTTP или Tonic для gRPC
    • async-std — стремится быть "стандартной библиотекой для асинхронного кода" и включает стандартную среду выполнения в async::task
    • smol — простой и легковесный

    Несколько больших приложений имеют собственные среды выполнения. Одним из таких приложений является Fuchsia.


    Фьючеры являются "инертными" в том смысле, что они ничего не делают, пока не будут опрошены исполнителем. Это отличается от промисов JS, например, которые запускаются, даже если никогда не используются.


    Tokio


    Tokio предоставляет:


    • многопоточную среду выполнения для выполнения асинхронного кода
    • асинхронную версию стандартной библиотеки
    • большую экосистему библиотек

    use tokio::time;
    
    async fn count_to(count: i32) {
        for i in 1..=count {
            println!("Значение счетчика в задаче: {i}!");
            time::sleep(time::Duration::from_millis(5)).await;
        }
    }
    
    #[tokio::main]
    async fn main() {
        tokio::spawn(count_to(10));
    
        for i in 1..5 {
            println!("Значение счетчика в основной задаче: {i}");
            time::sleep(time::Duration::from_millis(5)).await;
        }
    }

    Ремарки:


    • макрос tokio::main позволяет делать функцию main асинхронной
    • функция spawn создает новую параллельную "задачу"
    • обратите внимание: spawn принимает Future, мы не вызывает .await на count_to()
    • почему count_to() обычно не доходит до 10? Это пример отмены асинхронной операции. tokio::spawn() возвращает обработчик (handle), который может заставить поток ждать его завершения
    • попробуйте заменить tokio::spawn на count_to(10).await
    • попробуйте добавить ожидание завершения задачи, возвращаемой tokio::spawn()

    Задачи


    Rust имеет систему задач (task system), которая является формой легковесного трейдинга (threading).


    Задача имеет один верхнеуровневый фьючер, который опрашивается исполнителем. Этот фьючер может иметь несколько вложенных фьючеров, которые он опрашивает методом poll, что приблизительно соответствует стеку вызовов (call stack). Параллелизм внутри задачи возможен путем опроса нескольких дочерних фьючеров, например, запуск таймера и выполнение операции ввода-вывода.


    use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
    use tokio::net::TcpListener;
    
    #[tokio::main]
    async fn main() -> io::Result<()> {
        let listener = TcpListener::bind("127.0.0.1:0").await?;
        println!("запросы принимаются на порту {}", listener.local_addr()?.port());
    
        loop {
            let (mut socket, addr) = listener.accept().await?;
    
            println!("запрос из {addr:?}");
    
            tokio::spawn(async move {
                socket.write_all(b"Кто ты?\n").await.expect("ошибка сокета");
    
                let mut buf = vec![0; 1024];
                let name_size = socket.read(&mut buf).await.expect("ошибка сокета");
                let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim();
                let reply = format!("Привет, {name}!\n");
                socket.write_all(reply.as_bytes()).await.expect("ошибка сокета");
            });
        }
    }

    • Перед нами async блок. Такие блоки похожи на замыкания, но не принимают параметров. Их возвращаемым значением является фьючер, как у async fn
    • переделайте асинхронный блок в функцию и улучшите обработку ошибок с помощью оператора ?

    Асинхронные каналы


    Некоторые крейты поддерживают асинхронные каналы, например, tokio:


    use tokio::sync::mpsc::{self, Receiver};
    
    async fn ping_handler(mut input: Receiver<()>) {
        let mut count: usize = 0;
    
        while let Some(_) = input.recv().await {
            count += 1;
            println!("получено {count} пингов");
        }
    
        println!("ping_handler завершен");
    }
    
    #[tokio::main]
    async fn main() {
        let (sender, receiver) = mpsc::channel(32);
        let ping_handler_task = tokio::spawn(ping_handler(receiver));
        for i in 0..10 {
            sender.send(()).await.expect("провал отправки пинга");
            println!("отправлено {} пингов", i + 1);
        }
    
        drop(sender);
    
        ping_handler_task.await.expect("что-то пошло не так");
    }

    • измените размер канала на 3 и посмотрите, как это повлияет на выполнение
    • интерфейс асинхронных каналов аналогичен интерфейсу sync каналов, о которых мы говорили ранее
    • попробуйте удалить std::mem::drop. Что произойдет? Почему?
    • крейт Flume предоставляет каналы, которые реализуют как sync, так и async send, и recv. Это может быть полезным для сложных приложений с задачами обработки ввода-вывода и тяжелыми для ЦП задачами
    • async каналы могут быть использованы вместе с другими future для создания сложного потока управления (control flow)

    Поток управления фьючеров


    Фьючеры могут объединяться вместе для создания графов потоков параллельных вычислений. Мы уже видели задачи, которые функционируют как автономные потоки выполнения.


    Join


    Метод join_all ждет, когда все фьючеры будут готовы, и возвращает их результаты. Это похоже на Promise.all в JavaScript или asyncio.gather в Python.


    use anyhow::Result;
    use futures::future;
    use reqwest;
    use std::collections::HashMap;
    
    async fn size_of_page(url: &str) -> Result<usize> {
        let resp = reqwest::get(url).await?;
        Ok(resp.text().await?.len())
    }
    
    #[tokio::main]
    async fn main() {
        let urls: [&str; 4] = [
            "https://google.com",
            "https://httpbin.org/ip",
            "https://play.rust-lang.org/",
            "BAD_URL",
        ];
        let futures_iter = urls.into_iter().map(size_of_page);
        let results = future::join_all(futures_iter).await;
        let page_sizes_dict: HashMap<&str, Result<usize>> =
            urls.into_iter().zip(results.into_iter()).collect();
        println!("{:?}", page_sizes_dict);
    }

    Ремарки:


    • для нескольких фьючеров непересекающихся типов можно использовать std::future::join! но мы должны знать, сколько фьючеров у нас будет во время компиляции. В настоящее время join_all() находится в крейте futures, но скоро будет стабилизирован в std::future
    • риск соединения заключается в том, что какой-нибудь фьючер может никогда не разрешиться, что приведет к остановке программы
    • мы можем комбинировать join_all() с join!, например, чтобы объединить все запросы к службе HTTP, а также запрос к базе данных. Попробуйте добавить tokio::time::sleep() во фьючер, используя future::join!. Это не таймаут (для которого требуется select!, как описано в следующем разделе), а демонстрация работы join!

    Select


    Операция выбора (select) ждет готовности любого фьючера из набора и реагирует на его результат. В JavaScript это похоже на Promise.race. В Python это похоже на asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED).


    Подобно оператору match, тело select! имеет несколько ветвей (arms), каждая из которых имеет форму pattern = future => statement. Когда future готов, его возвращаемое значение деструктурируется pattern. Затем statement запускается с итоговыми переменными. Результат statement становится результатом макроса select!.


    use tokio::sync::mpsc::{self, Receiver};
    use tokio::time::{sleep, Duration};
    
    #[derive(Debug, PartialEq)]
    enum Animal {
        Cat { name: String },
        Dog { name: String },
    }
    
    async fn first_animal_to_finish_race(
        mut cat_rcv: Receiver<String>,
        mut dog_rcv: Receiver<String>,
    ) -> Option<Animal> {
        tokio::select! {
            cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }),
            dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? })
        }
    }
    
    #[tokio::main]
    async fn main() {
        let (cat_sender, cat_receiver) = mpsc::channel(32);
        let (dog_sender, dog_receiver) = mpsc::channel(32);
    
        tokio::spawn(async move {
            sleep(Duration::from_millis(500)).await;
            cat_sender.send(String::from("Феликс")).await.expect("ошибка отправки имени кота");
        });
    
        tokio::spawn(async move {
            sleep(Duration::from_millis(50)).await;
            dog_sender.send(String::from("Рекс")).await.expect("ошибка отправки имени собаки");
        });
    
        let winner = first_animal_to_finish_race(cat_receiver, dog_receiver)
            .await
            .expect("ошибка получения победителя");
    
        println!("Победителем является {winner:?}");
    }

    Ремарки:


    • в примере у нас имеется гонка между кошкой и собакой. first_animal_to_finish_race() "слушает" (listening) оба канала и возвращает первый по времени результат. Поскольку имя собаки прибывает через 50 мс, собака выигрывает у кошки, имя которой прибывает через 500 мс
    • в примере вместо channel можно использовать oneshot, поскольку предполагается однократный вызов метода send
    • попробуйте добавить к гонке дедлайн, демонстрируя выбор разных фьючеров
    • обратите внимание, что select! уничтожает не совпавшие ветви, что отменяет их фьючеры. select! легче всего использовать, когда каждое выполнение этого макроса создает новые фьючеры
      • альтернативой является передача &mut future вместо самого фьючера, но это может привести к проблемам, о котором мы поговорим позже

    Ловушки async/await


    async/await предоставляет удобную и эффективную абстракцию для параллельного асинхронного программирования. Однако модель async/await в Rust также имеет свои подводные камни и ловушки, о которых мы поговорим в этом разделе.


    Блокировка исполнителя


    Большинство асинхронных сред выполнения допускают одновременное выполнение только задач ввода-вывода. Это означает, что задачи блокировки ЦП будут блокировать исполнителя (executor) и препятствовать выполнению других задач. Простой обходной путь — использовать эквивалентные асинхронные методы там, где это возможно.


    use futures::future::join_all;
    use std::time::Instant;
    
    async fn sleep_ms(start: &Instant, id: u64, duration_ms: u64) {
        std::thread::sleep(std::time::Duration::from_millis(duration_ms));
        println!(
            "фьючер {id} спал в течение {duration_ms} мс, завершился после {} мс",
            start.elapsed().as_millis()
        );
    }
    
    #[tokio::main(flavor = "current_thread")]
    async fn main() {
        let start = Instant::now();
        let sleep_futures = (1..=10).map(|t| sleep_ms(&start, t, t * 10));
        join_all(sleep_futures).await;
    }

    Ремарки:


    • запустите код и убедитесь, что переходы в режим сна происходят последовательно, а не одновременно
    • flavor = "current_thread" помещает все задачи в один поток. Это делает эффект более очевидным, но рассмотренная ошибка присутствует и в многопоточной версии
    • замените std::thread::sleep на tokio::time::sleep и дождитесь результата
    • другим решением может быть tokio::task::spawn_blocking, который порождает реальный поток и преобразует его дескриптор в future, не блокируя исполнителя
    • о задачах не следует думать как о потоках ОС. Они не совпадают 1 к 1, и большинство исполнителей позволяют выполнять множество задач в одном потоке ОС. Это особенно проблематично при взаимодействии с другими библиотеками через FFI, где эта библиотека может зависеть от локального хранилища потоков или сопоставляться с конкретными потоками ОС (например, CUDA). В таких ситуациях отдавайте предпочтение tokio::task::spawn_blocking
    • используйте синхронные мьютексы осторожно. Удержание мьютекса над .await может привести к блокировке другой задачи, которая может выполняться в том же потоке

    Pin


    Асинхронные блоки и функции возвращают типы, реализующие трейт Future. Возвращаемый тип является результатом преобразования компилятора, который превращает локальные переменные в данные, хранящиеся внутри фьючера.


    Некоторые из этих переменных могут содержать указатели на другие локальные переменные. По этой причине фьючеры никогда не должны перемещаться в другую ячейку памяти, поскольку это сделает такие указатели недействительными.


    Чтобы предотвратить перемещение фьючера в памяти, его можно опрашивать только через закрепленный (pinned) указатель. Pin — это оболочка ссылки, которая запрещает все операции, которые могли бы переместить экземпляр, на который она указывает, в другую ячейку памяти.


    use tokio::sync::{mpsc, oneshot};
    use tokio::task::spawn;
    use tokio::time::{sleep, Duration};
    
    // Рабочая единица. В данном случае она просто спит в течение определенного времени
    // и отвечает сообщением в канал `respond_on`
    #[derive(Debug)]
    struct Work {
        input: u32,
        respond_on: oneshot::Sender<u32>,
    }
    
    // Воркер, который ищет работу в очереди (queue) и выполняет ее
    async fn worker(mut work_queue: mpsc::Receiver<Work>) {
        let mut iterations = 0;
        loop {
            tokio::select! {
                Some(work) = work_queue.recv() => {
                    sleep(Duration::from_millis(10)).await; // выполняем "работу"
                    work.respond_on
                        .send(work.input * 1000)
                        .expect("провал отправки ответа");
                    iterations += 1;
                }
                // TODO: сообщать о количестве итераций каждый 100 мс
            }
        }
    }
    
    // "Запрашиватель" (requester), который запрашивает работу и ждет ее выполнения
    async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
        let (tx, rx) = oneshot::channel();
        work_queue
            .send(Work { input, respond_on: tx })
            .await
            .expect("провал отправки работы в очередь");
        rx.await.expect("провал ожидания ответа")
    }
    
    #[tokio::main]
    async fn main() {
        let (tx, rx) = mpsc::channel(10);
        spawn(worker(rx));
        for i in 0..100 {
            let resp = do_work(&tx, i).await;
            println!("результат работы для итерации {i}: {resp}");
        }
    }

    Ремарки:


    • в примере вы могли распознать шаблон актора (actor pattern). Акторы, как правило, вызывают select! в цикле
    • это обобщение нескольких предыдущих уроков, так что не торопитесь
      • добавьте _ = sleep(Duration::from_millis(100)) => { println!(..) } в select!. Это никогда не выполнится. Почему?
      • теперь добавьте timeout_fut, содержащий этот фьючер за пределами loop:

    let mut timeout_fut = sleep(Duration::from_millis(100));
    loop {
        select! {
            ..,
            _ = timeout_fut => { println!(..); },
        }
    }

    • это также не будет работать. Изучите ошибки компилятора, добавьте &mut в timeout_fut в select! для решения проблемы перемещения, затем используйте Box::pin:

    let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
    loop {
        select! {
            ..,
            _ = &mut timeout_fut => { println!(..); },
        }
    }

    • это компилируется, но по истечении тайм-аута на каждой итерации происходит Poll::Ready (для решения этой проблемы может помочь объединенный фьючер). Обновите код, чтобы сбрасывать timeout_fut каждый раз, когда он истекает
      • Box выделяет память в куче. В некоторых случаях std::pin::pin! — это тоже вариант, но его сложно использовать для фьючера, которой переназначается
      • другая альтернатива — вообще не использовать pin, а создать другую задачу, которая будет отправляться в канал oneshot каждые 100 мс
      • данные, содержащие указатели на себя, называются самоссылающимися (self-referential). Обычно средство проверки заимствований (borrow checker) в Rust предотвращает перемещение таких данных, поскольку ссылки не могут жить дольше данных, на которые они указывают. Однако преобразование кода для асинхронных блоков и функций не проверяется средством проверки заимствований
      • Pin — это обертка над ссылкой. Объект не может перемещаться с помощью закрепленного указателя. Однако он может перемещаться с помощью незакрепленного указателя
      • метод poll трейта Future использует Pin<&mut Self> вместо &mut Self для ссылки на экземпляр. Вот почему он может вызываться только на закрепленном указателе

    Асинхронные трейты


    Асинхронные методы в трейтах пока не поддерживаются в стабильной версии Rust.


    Крейт async_trait предоставляет макрос для решения этой задачи:


    use async_trait::async_trait;
    use std::time::Instant;
    use tokio::time::{sleep, Duration};
    
    #[async_trait]
    trait Sleeper {
        async fn sleep(&self);
    }
    
    struct FixedSleeper {
        sleep_ms: u64,
    }
    
    #[async_trait]
    impl Sleeper for FixedSleeper {
        async fn sleep(&self) {
            sleep(Duration::from_millis(self.sleep_ms)).await;
        }
    }
    
    async fn run_all_sleepers_multiple_times(
        sleepers: Vec<Box<dyn Sleeper>>,
        n_times: usize,
    ) {
        for _ in 0..n_times {
            println!("running all sleepers..");
            for sleeper in &sleepers {
                let start = Instant::now();
                sleeper.sleep().await;
                println!("slept for {}ms", start.elapsed().as_millis());
            }
        }
    }
    
    #[tokio::main]
    async fn main() {
        let sleepers: Vec<Box<dyn Sleeper>> = vec![
            Box::new(FixedSleeper { sleep_ms: 50 }),
            Box::new(FixedSleeper { sleep_ms: 100 }),
        ];
        run_all_sleepers_multiple_times(sleepers, 5).await;
    }

    Ремарки:


    • async_trait прост в использовании, но учтите, что для работы он использует выделение памяти в куче. Это влечет издержки производительности
    • попробуйте создать новую "спящую" структуру, которая будет спать случайное время, и добавить ее в вектор

    Отмена


    Удаление фьючера означает, что его больше никогда нельзя будет опросить. Это называется отменой (cancellation) и может произойти в любой момент ожидания. Необходимо позаботиться о том, чтобы система работала правильно даже в случае отмены фьючера. Например, он не должен блокироваться или терять данные.


    use std::io::{self, ErrorKind};
    use std::time::Duration;
    use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
    
    struct LinesReader {
        stream: DuplexStream,
    }
    
    impl LinesReader {
        fn new(stream: DuplexStream) -> Self {
            Self { stream }
        }
    
        async fn next(&mut self) -> io::Result<Option<String>> {
            let mut bytes = Vec::new();
            let mut buf = [0];
            while self.stream.read(&mut buf[..]).await? != 0 {
                bytes.push(buf[0]);
                if buf[0] == b'\n' {
                    break;
                }
            }
            if bytes.is_empty() {
                return Ok(None);
            }
            let s = String::from_utf8(bytes)
                .map_err(|_| io::Error::new(ErrorKind::InvalidData, "не UTF-8"))?;
            Ok(Some(s))
        }
    }
    
    async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
        for b in source.bytes() {
            dest.write_u8(b).await?;
            tokio::time::sleep(Duration::from_millis(10)).await
        }
        Ok(())
    }
    
    #[tokio::main]
    async fn main() -> std::io::Result<()> {
        let (client, server) = tokio::io::duplex(5);
        let handle = tokio::spawn(slow_copy("привет\nпривет\n".to_owned(), client));
    
        let mut lines = LinesReader::new(server);
        let mut interval = tokio::time::interval(Duration::from_millis(60));
        loop {
            tokio::select! {
                _ = interval.tick() => println!("тик!"),
                line = lines.next() => if let Some(l) = line? {
                    print!("{}", l)
                } else {
                    break
                },
            }
        }
        handle.await.unwrap()?;
        Ok(())
    }

    Ремарки:


    • компилятор не помогает с обеспечением безопасности отмены. Необходимо читать документацию API и понимать, каким состоянием владеет ваша async fn
    • в отличие от panic! и ?, отмена — это часть нормального управления потоком выполнения (а не обработка ошибок)
    • в примере теряется часть строки
      • если ветвь tick() выполняется первой, next() и его buf уничтожаются
      • LinesReader можно сделать безопасным для отмены путем включения buf в структуру:

    struct LinesReader {
        stream: DuplexStream,
        bytes: Vec<u8>,
        buf: [u8; 1],
    }
    
    impl LinesReader {
        fn new(stream: DuplexStream) -> Self {
            Self { stream, bytes: Vec::new(), buf: [0] }
        }
        async fn next(&mut self) -> io::Result<Option<String>> {
            // ...
            let raw = std::mem::take(&mut self.bytes);
            let s = String::from_utf8(raw)
            // ...
        }
    }

    • Interval::tick безопасен для отмены, поскольку он отслеживает, был ли "доставлен" (delivered) тик
    • AsyncReadExt::read безопасен для отмены, поскольку он либо возвращается, либо не читает данные
    • AsyncBufReadExt::read_line, как и пример, не является безопасным для отмены. Подробности и альтернативы см. в документации

    Упражнения


    Для тренировки навыков работы с асинхронным Rust, есть еще два упражнения:


    • обедающие философы — на этот раз вам нужно решить эту задачу с помощью асинхронного Rust
    • приложение для чата

    Обедающие философы


    use std::sync::Arc;
    use tokio::sync::mpsc::{self, Sender};
    use tokio::sync::Mutex;
    use tokio::time;
    
    struct Fork;
    
    struct Philosopher {
        name: String,
        // left_fork: ...
        // right_fork: ...
        // thoughts: ...
    }
    
    impl Philosopher {
        async fn think(&self) {
            self.thoughts
                .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name))
                .await
                .unwrap();
        }
    
        async fn eat(&self) {
            // Пытаемся до тех пор, пока не получим обе вилки
            println!("{} ест...", &self.name);
            time::sleep(time::Duration::from_millis(5)).await;
        }
    }
    
    static PHILOSOPHERS: &[&str] =
         &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"];
    
    #[tokio::main]
    async fn main() {
        // Создаем вилки
    
        // Создаем философов
    
        // Каждый философ размышляет и ест 100 раз
    
        // Выводим размышления философов
    }

    Для работы с асинхронным Rust рекомендуется использовать tokio:


    [package]
    name = "dining-philosophers-async"
    version = "0.1.0"
    edition = "2021"
    
    [dependencies]
    tokio = { version = "1.26.0", features = ["sync", "time", "macros", "rt-multi-thread"] }

    Подсказка: на этот раз вам придется использовать Mutex и модуль mpsc из tokio.


    Решение
    use std::sync::Arc;
    use tokio::sync::mpsc::{self, Sender};
    use tokio::sync::Mutex;
    use tokio::time;
    
    struct Fork;
    
    struct Philosopher {
        name: String,
        left_fork: Arc<Mutex<Fork>>,
        right_fork: Arc<Mutex<Fork>>,
        thoughts: Sender<String>,
    }
    
    impl Philosopher {
        async fn think(&self) {
            self.thoughts
                .send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name))
                .await
                .unwrap();
        }
    
        async fn eat(&self) {
             // Пытаемся до тех пор, пока не получим обе вилки
            let (_left_fork, _right_fork) = loop {
                // Берем вилки...
                let left_fork = self.left_fork.try_lock();
                let right_fork = self.right_fork.try_lock();
    
                let Ok(left_fork) = left_fork else {
                    // Если мы не получили левую вилку, удаляем правую вилку,
                    // если она у нас была, позволяя выполняться другим задачам
                    drop(right_fork);
                    time::sleep(time::Duration::from_millis(1)).await;
                    continue;
                };
    
                let Ok(right_fork) = right_fork else {
                    // Если мы не получили правую вилку, удаляем левую вилку,
                    // если она у нас была, позволяя выполняться другим задачам
                    drop(left_fork);
                    time::sleep(time::Duration::from_millis(1)).await;
                    continue;
                };
    
                break (left_fork, right_fork);
            };
    
            println!("{} ест...", &self.name);
            time::sleep(time::Duration::from_millis(5)).await;
            // Блокировки уничтожаются здесь
        }
    }
    
    static PHILOSOPHERS: &[&str] =
       &["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"];
    
    #[tokio::main]
    async fn main() {
        // Создаем вилки
        let mut forks = vec![];
        (0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));
    
        // Создаем философов
        let (philosophers, mut rx) = {
            let mut philosophers = vec![];
    
            let (tx, rx) = mpsc::channel(10);
    
            for (i, name) in PHILOSOPHERS.iter().enumerate() {
                let left_fork = Arc::clone(&forks[i]);
                let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);
    
                philosophers.push(Philosopher {
                    name: name.to_string(),
                    left_fork,
                    right_fork,
                    thoughts: tx.clone(),
                });
            }
    
            (philosophers, rx)
            // `tx` уничтожается здесь, поэтому нам не нужно явно удалять его позже
        };
    
        // Каждый философ думает и ест 100 раз
        for phil in philosophers {
            tokio::spawn(async move {
                for _ in 0..100 {
                    phil.think().await;
                    phil.eat().await;
                }
            });
        }
    
        // Выводим размышления философов
        while let Some(thought) = rx.recv().await {
            println!("{thought}");
        }
    }

    Чат


    В этом упражнении мы используем новые знания для разработки приложения чата. У нас есть сервер, к которому подключаются клиенты и в котором они публикуют свои сообщения. Клиент читает пользовательские сообщения через стандартный ввод и отправляет их на сервер. Сервер передает (broadcast) сообщение всем клиентам.


    Для реализации этого функционала мы будем использовать широковещательный канал на сервере и tokio_websockets для взаимодействия между клиентом и сервером.


    Создайте новый проект и добавьте следующие зависимости в Cargo.toml:


    [package]
    name = "chat-async"
    version = "0.1.0"
    edition = "2021"
    
    [dependencies]
    futures-util = { version = "0.3.30", features = ["sink"] }
    http = "1.0.0"
    tokio = { version = "1.28.1", features = ["full"] }
    tokio-websockets = { version = "0.5.1", features = ["client", "fastrand", "server", "sha1_smol"] }

    Необходимые API


    Вам потребуются следующие функции из tokio и tokio_websockets. Потратьте несколько минут для ознакомления со следующими API:


    • StreamExt::next(), реализуемый WebSocketStream — для асинхронного чтения сообщений из потока веб-сокетов
    • SinkExt::send(), реализуемый WebSocketStream — для асинхронной отправки сообщений в поток веб-сокетов
    • Lines::next_line() — для асинхронного чтения сообщений пользователя через стандартный ввод
    • Sender::subscribe() — для подписки на широковещательный канал

    Два бинарника


    Как правило, в проекте может быть только один исполняемый файл (binary) и один файл src/main.rs. Нам требуется два бинарника. Один для клиента и еще один для сервера. Теоретически их можно сделать двумя отдельными проектами, но мы поместим оба бинарника в один проект. Для того, чтобы это работало, клиент и сервер должны находиться в директории src/bin (см. документацию).


    Скопируйте следующий код сервера и клиента в src/bin/server.rs и src/bin/client.rs, соответственно.


    // src/bin/server.rs
    use futures_util::sink::SinkExt;
    use futures_util::stream::StreamExt;
    use std::error::Error;
    use std::net::SocketAddr;
    use tokio::net::{TcpListener, TcpStream};
    use tokio::sync::broadcast::{channel, Sender};
    use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
    
    async fn handle_connection(
        addr: SocketAddr,
        mut ws_stream: WebSocketStream<TcpStream>,
        bcast_tx: Sender<String>,
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
        todo!("реализуй меня")
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
        let (bcast_tx, _) = channel(16);
    
        let listener = TcpListener::bind("127.0.0.1:2000").await?;
        println!("Запросы принимаются на порту 2000");
    
        loop {
            let (socket, addr) = listener.accept().await?;
            println!("Запрос от {addr:?}");
            let bcast_tx = bcast_tx.clone();
            tokio::spawn(async move {
                // Оборачиваем сырой поток TCP в веб-сокет
                let ws_stream = ServerBuilder::new().accept(socket).await?;
    
                handle_connection(addr, ws_stream, bcast_tx).await
            });
        }
    }

    // src/bin/client.rs
    use futures_util::stream::StreamExt;
    use futures_util::SinkExt;
    use http::Uri;
    use tokio::io::{AsyncBufReadExt, BufReader};
    use tokio_websockets::{ClientBuilder, Message};
    
    #[tokio::main]
    async fn main() -> Result<(), tokio_websockets::Error> {
        let (mut ws_stream, _) =
            ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
                .connect()
                .await?;
    
        let stdin = tokio::io::stdin();
        let mut stdin = BufReader::new(stdin).lines();
    
        todo!("реализуй меня")
    }

    Запуск бинарников


    Команда для запуска сервера:


    cargo run --bin server

    Команда для запуска клиента:


    cargo run --bin client

    Задачи


    • реализовать функцию handle_connection в src/bin/server.rs
      • подсказка: используйте tokio::select! для параллельного выполнения двух задач в бесконечном цикле. Одна задача получает сообщения от клиента и передает их другим клиентам. Другая — отправляет клиенту сообщения, полученные от сервера
    • завершите функцию main в src/bin/client.rs
      • подсказка: также используйте tokio::select! в бесконечном цикле для параллельного выполнения двух задач: 1) чтение сообщений пользователя из стандартного ввода и их отправка серверу; 2) получение сообщений от сервера и их отображение
    • опционально: измените код для передачи сообщений всем клиентам, кроме отправителя

    Решение
    // src/bin/server.rs
    use futures_util::sink::SinkExt;
    use futures_util::stream::StreamExt;
    use std::error::Error;
    use std::net::SocketAddr;
    use tokio::net::{TcpListener, TcpStream};
    use tokio::sync::broadcast::{channel, Sender};
    use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
    
    async fn handle_connection(
        addr: SocketAddr,
        mut ws_stream: WebSocketStream<TcpStream>,
        bcast_tx: Sender<String>,
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
    
        ws_stream
            .send(Message::text("Добро пожаловать в чат! Отправьте сообщение".to_string()))
            .await?;
        let mut bcast_rx = bcast_tx.subscribe();
    
        // Бесконечный цикл для параллельного выполнения двух задач:
        // 1) получение сообщений из `ws_stream` и их передача клиентам
        // 2) получение сообщений в `bcast_rx` и их отправка клиенту
        loop {
            tokio::select! {
                incoming = ws_stream.next() => {
                    match incoming {
                        Some(Ok(msg)) => {
                            if let Some(text) = msg.as_text() {
                                println!("{addr:?}: {text:?}");
                                bcast_tx.send(text.into())?;
                            }
                        }
                        Some(Err(err)) => return Err(err.into()),
                        None => return Ok(()),
                    }
                }
                msg = bcast_rx.recv() => {
                    ws_stream.send(Message::text(msg?)).await?;
                }
            }
        }
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
        let (bcast_tx, _) = channel(16);
    
        let listener = TcpListener::bind("127.0.0.1:2000").await?;
        println!("Запросы принимаются на порту 2000");
    
        loop {
            let (socket, addr) = listener.accept().await?;
            println!("Запрос от {addr:?}");
            let bcast_tx = bcast_tx.clone();
            tokio::spawn(async move {
                // Оборачиваем сырой поток TCP в веб-сокет
                let ws_stream = ServerBuilder::new().accept(socket).await?;
    
                handle_connection(addr, ws_stream, bcast_tx).await
            });
        }
    }

    // src/bin/client.rs
    use futures_util::stream::StreamExt;
    use futures_util::SinkExt;
    use http::Uri;
    use tokio::io::{AsyncBufReadExt, BufReader};
    use tokio_websockets::{ClientBuilder, Message};
    
    #[tokio::main]
    async fn main() -> Result<(), tokio_websockets::Error> {
        let (mut ws_stream, _) =
            ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
                .connect()
                .await?;
    
        let stdin = tokio::io::stdin();
        let mut stdin = BufReader::new(stdin).lines();
    
        // Бесконечный цикл для параллельной отправки и получения сообщений
        loop {
            tokio::select! {
                incoming = ws_stream.next() => {
                    match incoming {
                        Some(Ok(msg)) => {
                            if let Some(text) = msg.as_text() {
                                println!("От сервера: {}", text);
                            }
                        },
                        Some(Err(err)) => return Err(err.into()),
                        None => return Ok(()),
                    }
                }
                res = stdin.next_line() => {
                    match res {
                        Ok(None) => return Ok(()),
                        Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,
                        Err(err) => return Err(err.into()),
                    }
                }
    
            }
        }
    }

    Это конец бонусной части руководства.


    Материалы для более глубокого изучения рассмотренных тем:



    Happy coding!




Комментарии (10)


  1. Sabirman
    28.03.2024 10:41

    Как в Rust-е сделать объектную динамически-подключаемую библиотеку.

    На Java, C#, Python и т.п. объекты стандартизированы. Т.е. ты в своем классе реализуешь какой-то интерфейс и подключаешь свой плагин к основной программе.

    В Rust-е, я так понимаю, можно реализовывать только Си-шные функции. И вся прелесть раста при этом теряется.


    1. domix32
      28.03.2024 10:41
      +2

      Если хочется сделать доступ через C ABI, то делаем тип либы dylib, выключаем манглинг функций для нашего интерфейса и приводим свой API к сишным сигнатурам. Собственный отдельный ржавый FFI уже много лет в разработке, но и работы там непочатый край ибо никто не осилил пока оформить какой-то пропозал, удовлетворяющий эту хотелку. В некотором экспериментальном виде для WASM можно использовать interface types, так что в теории если ваш wasm рантайм их поддерживает, то можно делать через него подключаться.

      В остальном не очень понятно какие ещё вам варианты нужны? Общение между языками традиционно происходит через C ABI происходит, хоть в python, хоть в go, хоть в java и пока что лучше ничего не придумали. Есть отдельные крейты, позволяющие работать с этими языками на уровне объектов этих языков - например, pyo3 для питона или robusta для JNI.


      1. Sabirman
        28.03.2024 10:41
        +3

        Попробую по другому объяснить. Есть внешняя, динамически-подключаемая, библиотека, которая тоже написана на Rust. Но, несмотря на то, что она написана на Rust-е, я должен вызывать её методы в Unsafe-блоках. В Java, C#, Python и т.д. в аналогичной ситуации внешний вызов не отличается от вызова внутренней функции и соблюдаются все защиты, заложенные в языке.

        А большие приложения никогда не идут одним исполняемым файлом, т.е. Rust не готов к использованию в больших приложениях.

        Или я ошибаюсь?


        1. domix32
          28.03.2024 10:41

          Судя по докам и обсуждениям динамические либы доступны только через С API, поэтому да, даже если она на Rust, она будет считаться за либу на другом языке и необходимы врапперы поверх unsafe кода. WASM+WASI модули в теории могут позволить избегать этого, но принципиально ситуация не меняется.

          А большие приложения никогда не идут одним исполняемым файлом, т.е. Rust не готов к использованию в больших приложениях.

          Не вижу принципиальной разницы, если б всё писалось на том же C/С++. Оно буквально работает "как у всех". С тем же успехом можно сказать, что и С/С++ не готовы к использованию в больших приложениях.

          В Java, C#, Python и т.д. в

          эти языки по-умолчанию имеют некоторый рантайм с некоторым окружением и пространством имён, в который они могут импортировать внешние модули. Они не имеют гарантий безопасности аналогичных Rust и фактически превращаются в каст void* к сигнатуре, которую вы представили. В Rust вам необходимо представить какие-то гарантии относительно загружаемого модуля компилятору, т.к. нет никакого рантайма, поэтому да - unsafe и врапперы - ваш вариант. Благо их кажется можно генерировать автоматически через макросы.


          1. sdramare
            28.03.2024 10:41

             Они не имеют гарантий безопасности аналогичных Rust и фактически превращаются в каст void* к сигнатуре, которую вы представили.

            Я думаю человек выше больше подразумевал что конечные бинарники Java/C# содержат в себе метаинформацию(рефлексию) о всех структурах/классах и методах внутри, так что их можно динамически подключать с гарантией что нужный класс реализует заданный интерфейс. В расте же при компиляции никакой метаинформации вроде того какие трейты для какой структуры реализованы не сохраняется, по-этому динамическая загрузка библиотеки всегда будет unsafe.


            1. domix32
              28.03.2024 10:41

               вроде того какие трейты для какой структуры реализованы не сохраняется

              не совсем так. оно в том или ином виде будет ссылаться на некоторые виртуальные таблицы, которые можно отрефлексировать при желании, но из-за того язык компилируемый гарантии интерфейса намертво зашиваются в бинарь. У managed языков есть рантайм, который позволяет его расширять и работать с ним как будто рабочее окружение было таким всегда.


        1. redfox0
          28.03.2024 10:41

          crate-type = ["dylib"]
          

          https://stackoverflow.com/a/75903553

          Другое дело, что у раста ABI пока ещё (?) не стабилизировано, так что динамические библиотеки и код придётся собирать одинаковой версией компилятора.

          Но проблемы с unsafe-блоками не вижу, если у вас есть исходники библиотеки.



  1. domix32
    28.03.2024 10:41
    +4

    фьючерса

    Фьючерсы - это термин из финансов. В расте они фьючи или футуры aka Future<T>.


  1. hrls
    28.03.2024 10:41
    +1

    вызов send() блокирует текущий поток до тех пор, пока в канале имеется место для новых сообщений

    Разве не наоборот?

    Unbounded в случае очередей будет логичнее назвать безразмерной.