В целом мне очень нравится концепция Акторов в асинхронном программировании. Низкоуровневое программирование с использованием очередей сообщений и обработчиков сообщений не позволяет мыслить в терминах высокоуровневых абстракций. Акторы позволяют абстрагироваться от низкоуровневых деталей. Другими словами, Акторы посылают сообщения друг другу. А то, что Акторы на самом деле работают на очередях сообщений, нам не важно. Это скрыто 'под капотом'
Во время изучения Tokio, я решил написать свой проект для модели Акторов. Как раз низкоуровневая часть Tokio позволила это сделать. Так появилась библиотека Gabriel2.
Примерно 10 месяцев назад у меня получилась первая версия библиотеки. Первым делом я определил типизированную структуру ссылки на Актора:
pub struct ActorRef<Actor, Message, State, Response, Error>
Потом определил trait Handler который необходимо реализовать для обработки сообщений.
pub trait Handler<Actor: Sync + Send + 'static, Message: Sync + Send + 'static,
State: Sync + Send + 'static, Response: Sync + Send + 'static,
Error: Sync + Send + 'static> {
fn receive(&self, ctx: Arc<Context<Actor, Message, State, Response, Error>>)
-> impl Future<Output = Result<Response, Error>> + Send;
}
Как вы видите в функцию "receive" передается контекст. Который выглядит так:
pub struct Context<Actor, Message, State, Response, Error> {
pub mgs: Message,
pub state: Arc<Mutex<State>>,
pub self_ref: Arc<ActorRef<Actor, Message, State, Response, Error>>,
}
В контексте содержится сообщение, состояние и ссылка на самого себя. Это позволяет обрабатывать сообщения и изменять состояние Актора. Также посылать самому себе сообщения или передать ссылку на себя другому Актору.
Для struct ActorRef реализованы следующие функции:
pub async fn new(name: impl AsRef<str>, actor: Actor, state: State, buffer: usize)
-> Result<Arc<Self>, Error>
pub async fn ask(&self, msg: Message) -> Result<Response, Error>
pub async fn send(&self, msg: Message) -> Result<(), std::io::Error>
pub async fn state(&self) -> Result<Arc<Mutex<State>>, std::io::Error>
pub async fn stop(&self) -> Result<(), Error>
Эти функции позволяют имея ссылку на Актора, отправлять ему сообщения, запрашивать состояние, останавливать Актора.
Для реализации функции "send" я использовал tokio::sync::mpsc, который позволяет создавать каналы для передачи сообщений от множества отправителей к одному получателю. Вот пример использования mpsc из официальной документации:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// A multi-producer, single-consumer queue for sending values between asynchronous tasks
let (tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
for i in 0..10 {
if let Err(_) = tx. send(i).await {
println!("receiver dropped");
return;
}
}
});
while let Some(i) = rx. recv().await {
println!("got = {}", i);
}
}
Для реализации функции "ask" я использовал канал tokio::sync::oneshot, который позволяет создавать каналы для передачи одного сообщения. Функция "send" не возвращает ответ от Актора. А вот функция "ask" возвращает ответ и значит ее реализация должна дождаться ответ и потом вернуть управление с результатом. Вот пример использования oneshot из официальной документации:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
// A one-shot channel is used for sending a single message between asynchronous tasks.
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
if let Err(_) = tx. send(3) {
println!("the receiver dropped");
}
});
match rx. await {
Ok(v) => println!("got = {:?}", v),
Err(_) => println!("the sender dropped"),
}
}
В итоге получилось довольно удобная библиотека для создания Акторов. Вот пример использования библиотеки для создания Актора Echo, который принимает сообщение Ping и возвращает Pong:
#[derive(Debug)]
pub struct Echo;
#[derive(Debug)]
pub enum Message {
Ping,
}
#[derive(Debug)]
pub enum Response {
Pong {counter: u32},
}
#[derive(Debug,Clone)]
pub struct State {
pub counter: u32,
}
#[derive(Debug, Error)]
pub enum EchoError {
#[error("unknown error")]
Unknown,
#[error("std::io::Error")]
StdErr(#[from] std::io::Error),
}
#[async_trait]
impl Handler<Echo, Message, State, Response, EchoError> for Echo {
async fn receive(&self, ctx: Arc<Context<Echo, Message, State, Response, EchoError>>)
-> Result<Response, EchoError> {
match ctx.mgs {
Message::Ping => {
println!("Received Ping");
let mut state_lock = ctx.state.lock().await;
state_lock.counter += 1;
if state_lock.counter > 10 {
Err(EchoError::Unknown)
} else {
Ok(Response::Pong{counter: state_lock.counter})
}
}
}
}
}
Вот пример использования Актора Echo:
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let state = State {
counter: 0,
};
let echo_ref = ActorRef::new("echo", Echo{}, state, 100000).await?;
println!("Sent Ping");
echo_ref.send(Message::Ping).await?;
println!("Sent Ping and ask response");
let pong = echo_ref.ask(Message::Ping).await?;
println!("Got {:?}", pong);
_ = echo_ref.stop().await;
Ok(())
}
Результат выполнения программы:
Sent Ping
Sent Ping and ask response
Received Ping
Received Ping
Got Pong { counter: 2 }
В итоге я по использовал свою библиотеку в продакшене коммерческого проекта. Помимо баг фиксов мне пришлось добавить в Handler две функции:
fn pre_start(&self, _state: Arc<Mutex<State>>, _self_ref: Arc<ActorRef<Actor, Message, State, Response, Error>>)
-> impl Future<Output = Result<(), Error>> + Send {
async {
Ok(())
}
}
fn pre_stop(&self, _state: Arc<Mutex<State>>, _self_ref: Arc<ActorRef<Actor, Message, State, Response, Error>>)
-> impl Future<Output = Result<(), Error>> + Send {
async {
Ok(())
}
}
Эти функции вызываются перед стартом и остановкой Актора. Это позволяет инициализировать и освобождать ресурсы.
Совсем недавно я решил добавить новую feature в библиотеку. Сетевой режим для работы Акторов. Стало интересно попробовать свои силы в низкоуровневом сетевом программировании. А то вся работа с сетью обычно заключается в вызове сторонних библиотек для работы с Базой Данных или HTTP.
В первую очередь я определил struct ActorServer и реализовал функцию "new":
pub async fn new(name: impl AsRef<str>, host: impl AsRef<str>, port: u16,
actor: Arc<ActorRef<Actor,Message, State, Response, Error>>)
-> Result<Arc<Self>, Error>
Внутри функции я использовал tokio::net::TcpListener для создания сервера, который ждет входящие соединения. Вот пример использования TcpListener из официальной документации:
use tokio::net::TcpListener;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
match listener. accept().await {
Ok((_socket, addr)) => println!("new client: {:?}", addr),
Err(e) => println!("couldn't get client: {:?}", e),
}
Ok(())
}
После соединения, я делаю разделение socket-а, что бы в него писать и из него читать независимо. Вот пример использования split из официальной документации:
let (read_half,write_half) = tokio::io::split(socket);
Далее я определил struct ActorClient и реализовал функцию "new":
pub async fn new(name: impl AsRef<str>, host: impl AsRef<str>, port: u16)
-> Result<Arc<Self>, Error>
Для установки соединения с сервером я использовал tokio::net::TcpStream. Вот пример из официальной документации:
use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to a peer
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// Write some data.
stream. write_all(b"hello world!").await?;
Ok(())
}
Потом у меня встала проблема сериализация и десериализации сообщений Акторов, которые надо посылать по сети. Для этой задачи я выбрал библиотеку Bincode. Вот пример использования Bincode из официальной документации:
use bincode::{config, Decode, Encode};
#[derive(Encode, Decode, PartialEq, Debug)]
struct Entity {
x: f32,
y: f32,
}
fn main() {
let config = config::standard();
let entity = Entity { x: 0.0, y: 4.0 };
let encoded: Vec<u8> = bincode::encode_to_vec(&world, config).unwrap();
let (decoded, len): (World, usize) = bincode::decode_from_slice(&encoded[..], config).unwrap();
assert_eq!(world, decoded);
assert_eq!(len, encoded.len());
Протокол передачи сообщений по сети вышел достаточно простым. Сначала я в сокет пишу число байт, которые буду передавать, а потом байты самого сообщения. Для вычисления размера сообщения я использовал:
let message: Vec<u8> = ...;
let message_len_vec: Vec<u8> = message.len().to_le_bytes().to_vec();
stream.write_all(&message_len_vec[..]).await?;
stream.write_all(&message[..]).await?;
Для того что бы потом принять корректно сообщение, я сначала принимаю байты, которые содержат размер сообщения, а потом уже само сообщение:
let packet_size = std::mem::size_of::<usize>();
let mut buffer:Vec<u8> = vec![0; packet_size];
loop {
match read_half.read_exact(&mut buffer).await {
Ok(n) => {
let message_size = usize::from_le_bytes(buffer[..].try_into().unwrap());
let mut message_buf:Vec<u8> = vec![0; message_size];
match read_half.read_exact(&mut message_buf).await {
...
}
}
...
}
}
Вот пример использования сетевого режима для Акторов:
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let state = State {
counter: 0,
};
let echo_ref = ActorRef::new("echo", crate::echo::Echo {}, state, 100000).await?;
let echo_server = ActorServer::new("echo_server", "127.0.0.1", 9001, echo_ref).await?;
let echo_client: Arc<ActorClient<Echo, Message, State, Response, EchoError >> =
ActorClient::new("echo_client", "127.0.0.1", 9001).await?;
println!("Sent Ping");
echo_client.send(Message::Ping).await?;
println!("Sent Ping and ask response");
let pong = echo_client.ask(Message::Ping).await?;
println!("Got {:?}", pong);
tokio::time::sleep(Duration::from_secs(1)).await;
_ = echo_client.stop().await;
_ = echo_server.stop().await;
Ok(())
}
Как видите, сетевой режим позволяет создавать Акторов на разных машинах и дает им возможность общаться между собой. И для этого не надо знать как работает сеть. Все это скрыто 'под капотом'. Более того размер кода для создания Акторов на разных машинах почти не увеличился по сравнению с локальным режимом работы Акторов.
В ближайших планах сделать балансировщик нагрузки для Акторов, который будет позволять распределять нагрузку между однотипными Акторами. Потом хочу добавить шину событий для Акторов, что бы Акторы могли подписываться на события других Акторов.
Исходный код проекта находится на GitHub. Если у вас есть желание помочь мне в разработке, то пишите мне в Telegram @ievkz или Discord @igumnovnsk.
GamePad64
Каждый разработчик на расте в какой-то момент начинает писать свою библиотеку для акторов.