Язык программирования Rust стал достаточно популярен благодаря своей надежности, безопасности и производительности. В рамках данной статьи мы не будем подробно обсуждать преимущества данного языка, так как на эту тему уже написано много статей. Вместо этого мы рассмотрим разработку простейшего сетевого приложения, работающего по принципу клиент-сервер.

Те читатели, кто уже знаком с Rust, могут пропустить следующий раздел и сразу перейти к рассмотрению сетевого приложения. Ну а тем, кто совсем не знаком с этим языком, предлагается установить необходимые инструменты и ознакомиться с базовыми конструкциями Rust.

Рабочая среда Rust

В качестве примера рассмотрим установку необходимых инструментов на Ubuntu. Для загрузки и запуска установочного скрипта выполним следующую команду:

curl --proto '=https' --tlsv1.3 https://sh.rustup.rs -sSf | sh

В процессе работы скрипта вам будет предложено выбрать тип установки. Выбираем  первый пункт 1) Proceed with installation (default).

Для того, чтобы убедиться в том, что все установилось успешно, выполним команду:

$ rustc --version

Ну и традиционный Hello world. Создаем файл с расширением rs.

$ nano hello.rs

И со следующим содержимым:

fn main() {
    println!("Hello world!");
}

Далее компилируем с помощью команды rustc и запускаем:

$ rustc test.rs

$ ./test

В рамках статьи мы не будем рассматривать синтаксис и команды Rust, так как данный материал тоже можно без труда найти. Так что перейдем сразу к основной теме статьи – сетевому программированию.

Сетевые средства

Для работы с сетевыми компонентами в Rust используются библиотеки. Все функции, связанные с сетью, расположены в пространстве имен std::net; для чтения и записи в сокеты также используются функции чтения и записи из std::io. Наиболее важной структурой здесь является IpAddr, представляющий собой общий IP-адрес, который может быть либо версии 4, либо 6. SocketAddr, который представляет собой общий адрес сокета (комбинацию IP и порта на хосте), TcpListener и TcpStream для обмена данными по протоколу TCP, UdpSocket для UDP и многое другое.

Так, если мы хотим начать слушать порт 8090 на рабочей машине, то выполнить это можно с помощью следующей команды:

    let listener = TcpListener::bind("0.0.0.0:8090").expect("Could not bind");

В функции main() мы создаем новый TcpListener, который в Rust представляет собой TCP-сокет, прослушивающий входящие соединения от клиентов. В нашем примере мы жестко задали локальный адрес и порт; значение локального адреса, равное 0.0.0.0, указывает ядру на необходимость привязки этого сокета ко всем доступным интерфейсам на этом хосте. В результате любой клиент, который может подключиться к сети, подключенной к этому хосту, сможет общаться с этим хостом по порту 8090

В реальном приложении номер порта должен быть настраиваемым параметром, взятым из CLI, переменной среды или файла конфигурации.

Если привязаться к конкретному порту не удалось, приложение завершает работу с сообщением Could not bind.

Метод listener.incoming(), который мы используем, возвращает итератор для потоков, которые подключились к серверу. Мы перебираем их и проверяем, не возникало ли в каком-либо из них ошибки. В этом случае мы можем распечатать сообщение об ошибке и перейти к следующему подключенному клиенту. Обратите внимание, что завершать работу всего приложения с ошибкой в этом случае нецелесообразно, поскольку сервер может нормально функционировать, если по какой-либо причине некоторые клиенты столкнутся с ошибками.

    for stream in listener.incoming() {
        match stream {
            Err(e) => { eprintln!("failed: {}", e) }
            Ok(stream) => {
                thread::spawn(move || {
                    handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
            }
        }
    }

Теперь мы должны считывать данные с каждого из клиентов в бесконечном цикле. Но запуск бесконечного цикла в главном потоке заблокирует его и никакие другие клиенты не смогут подключиться. Такое поведение для нас определенно нежелательно. Таким образом, мы должны создать рабочий поток для обработки каждого клиентского подключения. Логика чтения из каждого потока и обратной записи инкапсулирована в функцию, называемую handle_client.

fn handle_client(mut stream: TcpStream) -> Result<(), Error> {
    println!("Incoming connection from: {}", stream.peer_addr()?);
    let mut buf = [0; 512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 { return Ok(()) }
        stream.write(&buf[..bytes_read])?;
    }
}

Каждый поток получает замыкание, вызывающее эту функцию. Это замыкание должно быть перемещением, поскольку оно должно считывать переменную (поток) из охватывающей области. В функции мы выводим адрес удаленной конечной точки и порт, а затем определяем буфер для временного хранения данных. Мы также заботимся о том, что буфер обнуляется. Затем мы запускаем бесконечный цикл, в котором считываем все данные из потока. Метод read в потоке возвращает длину данных, которые он прочитал. Он может возвращать ноль в двух случаях: если он достиг конца потока или если длина данного буфера была равна нулю. Мы точно знаем, что второй случай неверен. Таким образом, мы прерываем цикл (и функцию), когда метод read возвращает ноль. В этом случае мы возвращаем Ok(). Затем мы записываем те же данные обратно в поток, используя синтаксис slice. Обратите внимание, что мы использовали eprintln! для вывода ошибок. Этот макрос преобразует данную строку в стандартную ошибку.

Давайте посмотрим исходный код нашего приложения полностью.

use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::{Read, Write, Error};

fn handle_client(mut stream: TcpStream) -> Result<(), Error> {
    println!("Incoming connection from: {}", stream.peer_addr()?);
    let mut buf = [0; 512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 { return Ok(()) }
        stream.write(&buf[..bytes_read])?;
    }
}

fn main() {
    let listener = TcpListener::bind("0.0.0.0:8888").expect("Could not bind");
    for stream in listener.incoming() {
        match stream {
            Err(e) => { eprintln!("failed: {}", e) }
            Ok(stream) => {
                thread::spawn(move || {
                    handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
            }
        }
    }
}

 Для компиляции выполним команду

$ rustc имя_файла_сервера.rs

Работа над ошибками

Можно заметить очевидное отсутствие обработки ошибок при чтении из потока и записи в поток. Но на самом деле это не так. Мы использовали оператор ? для обработки ошибок в этих вызовах. Этот оператор преобразует результат в Ok, если все прошло нормально; в противном случае он возвращает ошибку вызывающей функции раньше времени. Учитывая эту настройку, возвращаемый тип функции должен быть либо пустым, чтобы обрабатывать успешные случаи, либо типом io::Error, чтобы обрабатывать случаи ошибок. Обратите внимание, что в таких случаях было бы неплохо реализовать пользовательские ошибки и возвращать их вместо встроенных ошибок. Также обратите внимание, что оператор ? в настоящее время не может использоваться в функции main, поскольку функция main не возвращает результат.

Для того, чтобы убедиться в работоспособности нашего сервера можно обратиться к нему и отправить любой набор байт.

Пишем клиента

Конечно, с серверам можно общаться с помощью nc, однако, лучше все таки написать полноценного клиента. В приведенном ниже примере мы в бесконечном цикле cчитываем стандартный ввод и затем пересылаем эти данные на сервер.

В случае, если мы не можем прочитать ввод или передать на сервер, приложение завершается с соответствующими сообщениями.

use std::net::TcpStream;
use std::str;
use std::io::{self, BufRead, BufReader, Write};

fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:8888").expect("Could not connect to server");
    loop {
        let mut input = String::new();
        let mut buffer: Vec<u8> = Vec::new();
        io::stdin().read_line(&mut input).expect("Failed to read from stdin");
        stream.write(input.as_bytes()).expect("Failed to write to server");

        let mut reader = BufReader::new(&stream);

        reader.read_until(b'\n', &mut buffer).expect("Could not read into buffer");
        print!("{}", str::from_utf8(&buffer).expect("Could not write buffer as string"));
    }
}

 Откомпилируем также с помощью rustc:

$ rustc имя_файла_клиента.rs

Заключение

В этой статье мы рассмотрели основные моменты, связанные с работой в Rust, а также с разработкой несложных сетевых приложений. В следующей статье мы рассмотрим более сложные примеры кода, связанного с сетевой разработкой.

Больше про языки программирования можно узнать на онлайн-курсах под руководством экспертов-практиков.

Комментарии (8)


  1. tbl
    30.09.2024 19:47
    +7

    Зачем брать раст и писать на нем блокирующий сетевой код? И почему вы предлагаете спавнить поток для каждого коннекта? Это же антипаттерн.

    А вы используете чатгпт, когда готовите по нескольку некачественных статей в день на разные тематики?


    1. MountainGoat
      30.09.2024 19:47

      Да, я спросил у него пример ТСР сервера на Расте и получил точно такой же пример. Другое дело, что потом я уточнил, что потоков должно быть ровно 4 и получил вот это:

      Скрытый текст
      use std::io::{Read, Write};
      use std::net::{TcpListener, TcpStream};
      use std::sync::mpsc;
      use std::thread;
      use std::sync::{Arc, Mutex};
      
      const NUM_THREADS: usize = 4;
      
      struct ThreadPool {
          workers: Vec<Worker>,
          sender: mpsc::Sender<Job>,
      }
      
      type Job = Box<dyn FnOnce() + Send + 'static>;
      
      impl ThreadPool {
          fn new(size: usize) -> ThreadPool {
              assert!(size > 0);
      
              let (sender, receiver) = mpsc::channel();
              let receiver = Arc::new(Mutex::new(receiver));
      
              let mut workers = Vec::with_capacity(size);
      
              for id in 0..size {
                  workers.push(Worker::new(id, Arc::clone(&receiver)));
              }
      
              ThreadPool { workers, sender }
          }
      
          fn execute<F>(&self, f: F)
          where
              F: FnOnce() + Send + 'static,
          {
              let job = Box::new(f);
              self.sender.send(job).unwrap();
          }
      }
      
      struct Worker {
          _id: usize,
          _thread: thread::JoinHandle<()>,
      }
      
      impl Worker {
          fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
              let thread = thread::spawn(move || loop {
                  let job = receiver.lock().unwrap().recv().unwrap();
                  println!("Worker {} got a job; executing.", id);
                  job();
              });
      
              Worker {
                  _id: id,
                  _thread: thread,
              }
          }
      }
      
      fn handle_client(mut stream: TcpStream) {
          let mut buffer = [0; 1024];
      
          while match stream.read(&mut buffer) {
              Ok(size) if size > 0 => {
                  stream.write(&buffer[0..size]).unwrap();
                  true
              }
              Ok(_) => false,
              Err(_) => {
                  println!("An error occurred, terminating connection with {}", stream.peer_addr().unwrap());
                  false
              }
          } {}
      }
      
      fn main() {
          let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
          let pool = ThreadPool::new(NUM_THREADS);
      
          println!("Server listening on port 7878");
      
          for stream in listener.incoming() {
              match stream {
                  Ok(stream) => {
                      println!("New connection: {}", stream.peer_addr().unwrap());
                      pool.execute(move || {
                          handle_client(stream)
                      });
                  }
                  Err(e) => {
                      println!("Error: {}", e);
                  }
              }
          }
      }

      Так что да, программисты будут не нужны и т.д. и т.п.


      1. tbl
        30.09.2024 19:47
        +1

        А можно взять tokio и tower, что позволит выкинуть преамбулу, связанную с реализацией тредпула и воркеров, да еще и упростит main.


      1. tbl
        30.09.2024 19:47

        Глянул код свежим взглядом. Код все еще блокирующий. Решение от чатгпт предполагает удержание потока из пула на каждого клиента вплоть до закрытия соединения, блокируясь на вызове read. При исчерпании пула остальные подключения будут накапливаться в буфере mpsc.

        Программисты будут еще какое-то время нужны, как минимум для верификации галлюцинаций LLM, чтобы такое случайно на прод не уехало


      1. KivApple
        30.09.2024 19:47
        +1

        Этот код не будет обрабатывать больше 4 одновременно подключенных клиентов.

        Вам намекают использовать async.


  1. tbl
    30.09.2024 19:47

    Del


  1. morett1m
    30.09.2024 19:47
    +1

    Зачем каждый раз спавнить поток для каждого коннекта? Это ж антипаттерн, да и ресурсы так сжигаются на ура. Про асинхронный подход автор явно забыл — ведь можно было взять Tokio и легко избавиться от этого костыля с потоками. И да, весь этот подход точно не годится для продакшна


  1. lumag
    30.09.2024 19:47

    Надеялся увидеть хорошие примеры (Tokio, async), а не это. Очень жаль.