Всем привет! Это туториал по написанию небольшого WebSocket сервера для мультиплеерной мини игры. Суть игры простая – обмениваешься файлами с другими игроками в небольшом 2D пространстве. Полный код приложений есть в гитхабе, протестировать можно на сайте.

Demo
Demo

Начало

Большая часть объяснений описана в качестве комментариев к коду.

Создайте проект с помощью команды cargo new <name>. После этого добавьте в файл Cargo.toml все эти зависимости.

[dependencies]
# Рантайм для асинхронного кода
tokio = { version = "1.28.1", features = ["full"] }

# Асинхронная версия библиотеки tungstenite, реализующей протокол WebSocket, 
# работающая в связке с библиотекой tokio.
tokio-tungstenite = "0.18.0"

# Вспомогательные методы для работы с массивами байтов. 
bstr = "1.4.0"

# Вспомогательные метода для работы асинхронными объектами.
futures-util = "0.3"

# Библиотека для работы регулярными выражениями.
regex = "1"

# Mutex, RwLock и тд. быстрее чем в стандартной библиотеке.
parking_lot = "0.12.1"

# Генерирует рандомные числа.
rand = "0.8.5"

Базовые типы

Структура UserFileMessage – хранит в себе информацию о файле, переданную пользователем, а так же имеет метод позволяющий парсить эту информацию из массива байтов. Будет использоваться при получении бинарного сообщения (один из типов сообщений протокола WebSocket) от пользователя, для проверки его валидности.

Структура Connection – хранит в себе идентификатор соединения, а также само соединение, в которое записывается информация и передаётся подключенному юзеру.

Enum BroadcastEvents – содержит список событий, которые могут быть отправлены всем пользователем сразу. Нужен для того чтобы разделить на типы сообщения, получаемые Broadcast’ером (о нём ниже).

Код типов (Файл server/src/types.rs)
// Файл server/src/types.rs

// Трейт с вспомогательными методами для массима байтов 
use bstr::ByteSlice;

// SplitSink это Sink часть разделённого на два объекта потока. 
// Подробнее позже
use futures_util::stream::SplitSink;

// Асинхронный TCP стрим 
use tokio::net::TcpStream;

// Тип WebSocket потока и сообщения из него
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};

pub struct UserFileMessage {
  // Имя пользователя отправившего файл
  pub username: String,

  // Имя файла
  pub file_name: String,

  // Content-Type файла
  pub file_type: String,

  // Сам файл 
  pub file_bytes: Vec<u8>,
}

impl UserFileMessage {
  // Метод из массива байтов собирает структуру UserFileMessage.
  pub fn from(data: Vec<u8>) -> Option<Self> {
    // Паттерн разделителя, который отделяет каждый фрагмент 
    // информации от другого. Например:
    // username<pattern>filename<pattern>filetype<pattern>filebytes
    let pattern: [u8; 12] = [226, 128, 147, 226, 128, 147, 226, 128, 147, 226, 128, 147]; 
    
    // Делит массив по паттерну
    let result: Vec<Vec<u8>> = data.split_str(&pattern).map(|x|x.to_vec()).filter(|x| x.len() > 0).collect();
    
    // Если разделение прошло на 4 элемента, то все прошло успешно. 
    // Если нет, то данные переданы некорректно и нужно возвращать None.
    if result.len() == 4 {
      let username = String::from_utf8_lossy(&result[0]).to_string();
      let file_name = String::from_utf8_lossy(&result[1]).to_string();
      let file_type = String::from_utf8_lossy(&result[2]).to_string();
      
      let file_bytes = result[3].clone();
      
      Some(UserFileMessage { username, file_name, file_type, file_bytes })
    } else {
      None
    }
  }
}

#[derive(Debug)]
pub struct Connection {
  // Рандомный идентификатор соединения
  pub id: u32,

  // Sink часть websocket потока для передачи в него информации
  pub con: SplitSink<WebSocketStream<TcpStream>, Message>,
}

#[derive(Debug)]
pub enum BroadcastEvents {
  // Добавляет подключение в broadcast'ер
  AddConn(Connection),
 
  // Отправка всем участникам сообщения о подключении нового
  Join(String),

  // Отправка всем участникам сообщения о выходе игрока. 
  // Так же передаём первым элементом идентификатор его
  // подключения, чтобы удалить его из списка.
  Quit(u32, String),

  // Отправка всем участникам полученного от одного из них файла.
  SendFile(Message)
}

Broadcast

Broadcast – это функция, которая будет работать в отдельной Task’е tokio, принимать сигналы от других потоков и в зависимости от полученного сигнала, отправлять всем подключенным игрокам разные сообщения.

Task в tokio это зелёный (виртуальный) поток. Он управляется рантаймом токио, а не операционной системой.

Код broadcast (Файл server/src/broadcast.rs)
// Файл server/src/broadcast.rs

use std::collections::HashMap;

// Трейт добавляющий удобные методы для Sink объектов (в 
// нашем случае Sink'ом будет, полученная после деления, вторая часть
// потока WebSocketStream, в которую мы будем записывать данные, для 
// передачи их пользователю).
use futures_util::SinkExt;

// Тип получателя для неограниченного mpsc канала
use tokio::sync::mpsc::UnboundedReceiver;

// Тип сообщения, передаваемого по WebSocket'у
use tokio_tungstenite::tungstenite::Message;

use crate::types::{BroadcastEvents, Connection};

pub async fn run(mut rx: UnboundedReceiver<BroadcastEvents>) {
  let mut connections: HashMap<u32, Connection> = HashMap::new();

  // В цикле ждём новый сигнал 
  while let Some(event) = rx.recv().await {
    match event {
      // Добавляем соединение в список для рассылки
      BroadcastEvents::AddConn(conn) => {
        connections.insert(conn.id, conn);
      }
			
	  // Рассылаем всем юзерам сообщение о подключении нового игрока
      // Пример сообщения: JOIN|userA
      BroadcastEvents::Join(username) => {
        for (_, iconn) in connections.iter_mut() {
          let _ = iconn.con.send(Message::Text(format!("JOIN|{}", username))).await;
        }
      }

      // Рассылаем всем юзерам сообщение о выходе игрока
      // Пример сообщения: LEFT|userA
      BroadcastEvents::Quit(id, username) => {
        connections.remove(&id);

        if !username.is_empty() {
          for (_, conn) in connections.iter_mut() {
            let _ = conn.con.send(Message::Text(format!("LEFT|{}", username))).await;
          }
        }
      }

      // Пересылаем юзерам сообщение с файлом.
      // Так как сообщение для пересылки нужно клонировать и при 
	  // большом количестве игроков это может вызвать замедления в работе
      // функции, максимальный размер файла ограничен 5МБ. (Это будет ниже)
      BroadcastEvents::SendFile(msg) => {
        for (_, conn) in connections.iter_mut() {
          let _ = conn.con.send(msg.clone()).await;
        }
      }
    }
  }
}

Состояния игроков

Для контроля за состоянием игроков создадим отдельную структуру Game, в которой будем хранить список подключённых пользователей и отправитель сигналов в Broadcast.

Для предотвращения спама файлами, у нас будет стоять небольшой cooldown в 15 секунд для игроков на отправку файла. Поэтому за каждым игроком, кроме его имени и времени последней отправки файла, ничего закреплено не будет.

Код структуры Game (Файл server/src/game.rs)
// Файл server/src/game.rs

use std::{collections::HashMap, sync::Arc};

use parking_lot::Mutex;
use tokio::{sync::mpsc::UnboundedSender, time::Instant};
use tokio_tungstenite::tungstenite::Message;

use crate::types::{BroadcastEvents, Connection};

pub struct Player {
  // Время последнего отправленного файла. 
  // Так как, когда пользователь только зашел, он ещё не успел отправить
  // что-нибудь, записывать в это поле нечего. В таком случае будет
  // выставлено значение енама Option – None.
  pub dt_last_send: Option<Instant>,
}

#[derive(Clone)]
pub struct Game {
  // Мапа со всеми подключенными играками (Формат: username:Player).
  // Для того чтобы это поле было доступно для нескольких потоков,
  // его необходимо обернуть в смарт-поинтер Arc, которые реализует
  // множественное владение данными в нескольких потоках. А чтобы 
  // мапу можно было внутри Arc'а ещё и изменять, нужно дополнительно
  // завернуть её в Mutex. Так же Mutex обезопасит данные, ограничив
  // максимальное количество единовременно изменяемых мапу потоков до одного.
  pub players: Arc<Mutex<HashMap<String, Player>>>,

  // Отправитель сигналов в Broadcast
  pub broadcast_sender: UnboundedSender<BroadcastEvents>
}

impl Game {
  // Создаём новый инстанс
  pub fn new(broadcast_sender: UnboundedSender<BroadcastEvents>) -> Game {
    Game { 
      players: Arc::new(Mutex::new(HashMap::new())),
      broadcast_sender,
    }
  }
  
  // Добавляем игрока и отправляем сигнал в Broadcast.
 
  // Вызов метода send вернёт Result, который мы не хотим в данном
  // случае обрабатывать. Поэтому засунем его в игнорируемую переменную.
 
  // Так же проверяем есть ли такой игрок уже на сервере и 
  // не превысит ли количество игроков на сервере 50,
  // если игрок подключится. (Максимум 50 игроков)
  pub fn add_player(&self, username: String) {
    let mut players = self.players.lock();
    if !players.contains_key(&username) && players.len() < 50 {
      players.insert(username.clone(), Player { dt_last_send: None });
      let _ = self.broadcast_sender.send(BroadcastEvents::Join(username.clone()));
    }
  }

  // Добавляем соединение в Broadcast
  pub fn add_connection(&self, conn: Connection) {
    let _ = self.broadcast_sender.send(BroadcastEvents::AddConn(conn));
  }
 
  // Удаляем игрока и отправляем сигнал в Broadcast
  pub fn remove_player(&self, username: String, id: u32) {
    self.players.lock().remove(&username);
    let _ = self.broadcast_sender.send(BroadcastEvents::Quit(id, username));
  }

  // Формируем сообщение со списком пользователей.
  // Пример: LIST|userA,userB,userC
  pub fn get_list_message(&self) -> Message {
    let list_string = self.players.lock().iter().map(|w| w.0.to_owned()).collect::<Vec<String>>().join(",");

    Message::Text(format!("LIST|{}", list_string))
  }
	
  // Отправляем сообщение с файлом в Broadcast.
  // Перед этим проверяем прошел или нет у пользователя 
  // cooldown на отправку файла.
  pub fn send_file(&self, from: String, msg: Message) {
    let mut players = self.players.lock();
    if let Some(player) = players.get(&from) {
      if player.dt_last_send.is_none() || player.dt_last_send.unwrap().elapsed().as_secs() > 15 {
        let _ = self.broadcast_sender.send(BroadcastEvents::SendFile(msg));
        players.insert(from, Player { dt_last_send: Some(Instant::now()) });
      }
    }
  }
}

Получение запросов на соединение

Так как WebSocket работает поверх TCP, для начала нам нужно просто создать прослушивающий TCP сокет методом bind у TcpListener’а, и в цикле принимать все запросы на соединение.

Для оптимизации обработки соединений лучше выводить их в отдельный поток. В нашем случае, в Task’у токио.

Начало кода main.rs
// Файл server/src/main.rs

// Методы для удобного управления потоком подключения WebSocket
use futures_util::{StreamExt, SinkExt};

use tokio::net::{TcpStream, TcpListener};

// Фунция для принятия WebSocket соединения
use tokio_tungstenite::accept_async;

use tokio::sync::mpsc;

// Структура для работы с регулярными выражениями
use regex::Regex;

// Рандомайзер для чисел
use rand::Rng;

use types::{Connection, BroadcastEvents, UserFileMessage};
use game::Game;

mod types;
mod game;
mod broadcast;

// Обязательно навешиваем на main макрос tokio::main, который
// создаст рантайм токио для исполнения асинхронного кода.
#[tokio::main]
async fn main () {
    // Создаём прослушивающий TCP сокет на порте 8080
	let server = TcpListener::bind("0.0.0.0:8080").await.unwrap();
	
    // Создаём неограниченный канал mpsc
	let (broadcast_sender, broadcast_receiver) = mpsc::unbounded_channel::<BroadcastEvents>();
  
	// Создаём новый поток токио и на выполнение ему передаём нашу 
    // функцию Broadcast.
	tokio::spawn(broadcast::run(broadcast_receiver));
	
	let game = Game::new(broadcast_sender);
	
	// В цикле принимаем все запросы на соединение
	loop {
		let (stream, _) = server.accept().await.unwrap();
        // Обрабатываем все соединения в отдельном токио потоке
		tokio::spawn(process_con(stream, game.clone()));
	}
}

// Функция обработки соединения
async fn process_con(stream: TcpStream, game: Game) {
	println!("Connection!")
}

Обработка соединений

Наша функция обработки соединения будет работать в 2 этапа.

На первом этапе она отправит список все подключённых игроков пользователю, добавит соединение в список соединений в Broadcaster’е, чтобы подключенный пользователь уже мог получать данные о выходах и входах других игроков. И после этого будет ждать от пользователя сообщение формата «JOIN|<username>». После того, как она его получит, перейдёт ко второму этапу.

На втором этапе она будет в цикле ждать от пользователя сообщений с файлами и рассылать их всем остальным подключённым игрокам. Когда цикл в очередной раз попытается прочитать сообщение и получит ошибку, это будет означать обрыв соединения с пользователем. После этого функция вызовет метод remove_player у инстанса game, что удалит соединение пользователя из Broadcast’а и его ник из списка игроков, а так же сообщит о его отключении всем подключённым пользователям.

Код обработчика соединений (Файл server/mail.rs)
// Файл server/src/main.rs

// ...

async fn process_con(stream: TcpStream, game: Game) {
    // Генерируем идентификатор для соединения
	let id = rand::thread_rng().gen::<u32>();
  
	// Производим websocket handshake, после чего получаем 
    // websocket поток или ошибку. В случае ошибки виртуальный 
    // поток токио оборвется.
	let websocket = accept_async(stream).await.unwrap();
	
	// Делим поток вебсокета на Sink и Stream объекты 
    // для того чтобы можно было читать данные с потока в 
    // одном потоке, а получать в другом. В нашем случае 
    // Sink объект (sender) мы отправим броадкастеру, а с 
    // Stream продолжим работать тут чтоб получать сообщения.
	let (mut sender, mut receiver) = websocket.split();

	let mut username = String::new(); 
	
	// Отправляем список игроков пользователю
	let _ = sender.send(game.get_list_message()).await;

	// Добавляем подключение в список подключений Broadcast
	game.add_connection(Connection {
		id,
		con: sender,
	});

	// Ждём сообщение входа от пользователя.
    // Если приходят любые другие сообщения, кроме текстовых
    // формата "JOIN|<username>" – игнорируем их.
    // При это длина имени игрока не должна превышать 13 символов.

    // Когда валидное сообщение получено, он добавляет игрока в список и 
    // сообщает всем остальнм о новом участнике.
	while let Some(msg) = receiver.next().await {
		if let Ok(msg) = msg {
			if !msg.is_text() {
				continue;
			}
			if let Ok(data) = msg.clone().into_text() {
				let re = Regex::new(r"JOIN\|[A-Za-z0-9]*$").unwrap();
				if re.is_match(&data) {
					username = (&data[5..]).to_owned();
					if username.len() > 13 {
						continue;
					}
					game.add_player(username.clone());
					break;
				}
			}
		} else {
			game.remove_player(username, id);
			return;
		}
	}

	// Обработываем полученные сообщения после входа.
    // Все не бинарные сообщение игнорируем, а бинарные парсим и 
    // проверяем на валидность. Если размер файла больше 5МБ, или 
    // если в сообщении отправителем указан не тот игрок, что 
    // записан в соединении – игнорируем. В ином случае, пересылаем 
    // сообщение всем подключенным игрокам.
	while let Some(msg) = receiver.next().await {
		if let Ok(msg) = msg {
			if !msg.is_binary() {
				continue;
			}
			if let Some(data) = UserFileMessage::from(msg.clone().into_data()) {
				if data.file_bytes.len() > 5_000_000 && data.username == username {
					continue;
				}

				game.send_file(data.username, msg);
			}
		} else {
			break;
		}
	}

	// Когда соединение оборвано (цикл закончился), удаляем игрока 
    // из списка и уведомляем всех остальных об этом.
	game.remove_player(username, id);
} 


Спасибо за внимание! :) Так же клиентскую часть игры можно найти в репозитории проекта по ссылке ниже.

Ссылка на полный код проекта – github.com/IDSaves/filecats

Комментарии (0)