В своем последнем проекте я использовал популярную библиотеку Actix для реализации модели акторов. Однако в процессе разработки я столкнулся с ограничениями.
Обработчики сообщений в Actix реализуются через трейт Handler, который требует синхронных методов, а не асинхронных функций. Для использования асинхронных вещей это крайне не удобно.
В Actix все акторы по умолчанию размещаются в одном потоке. Чтобы запустить акторы в отдельных потоках, можно создать отдельный Arbiter для каждого актора, это даст изолированный поток. А для моих потребностей нужно, чтобы каждый актор работал в своем потоке и чтобы они не были изолированы из-за этого для общения.
В итоге, я решил переписать свой код с использованием библиотеки Tokio Mpsc. Чтобы не дублировать общую логику взаимодействия акторов в разных частях проекта, я вынес ее в отдельную библиотеку. В отличие от Actix, я реализовал ее не как Фреймворк, а как чистую библиотеку общего назначения.
Всем известны различия между фреймворком и библиотекой в программировании:
Фреймворк
Предоставляет скелет приложения и определяет поток управления. Разработчик должен вписать свой код в предопределенные места.
Имеет инверсию управления - фреймворк вызывает код разработчика.
Обычно сложнее в освоении, чем библиотеки.
Плюсы:
Ускоряет разработку за счет готовой архитектуры и инструментов.
Поощряет следование лучшим практикам и паттернам проектирования.
Облегчает тестирование и поддержку кода.
Минусы:
Высокая избыточность для простых проектов.
Сложность в освоении из-за большого количества функций.
Привязка к конкретному фреймворку.
Библиотека
Предоставляет набор готовых классов и функций для решения частных задач.
Разработчик вызывает код библиотеки. Инверсия управления отсутствует.
Проще в освоении и применении, чем фреймворки.
Плюсы:
Легкая интеграция в существующие проекты.
Возможность выбрать только нужный набор функций.
Низкая привязка к конкретной библиотеке.
Минусы:
Требуется самостоятельно писать основной код приложения.
Отсутствие готовых шаблонов и архитектуры.
Трудности с поддержкой кода из-за отсутствия общих паттернов.
В целом, я устал от фреймворков и решил писать библиотеку. Надоело упираться в концептуальные ограничения от них.
Между тем текущие ограничения Rust не позволили мне полностью реализовать желаемый API. Например, невозможно объявить async метод в трейте, поэтому обработчик сообщений пришлось оформить как замыкание.
Еще одна проблема возникла при попытке добавить в библиотеку свой тип ошибок. Из-за ограничений в Rust, нельзя использовать параметризированный тип ошибки. Например, если в Box<dyn std::io::Error + Send + Sync> заменить std::io::Error на параметр ActorError из шаблона, то выдается ошибка, что требуется trait после dyn. В итоге, пришлось все сделать на std::io::Error. При использовании библиотеки, если хотите пользоваться магией символа '?', нужно для своих ошибок реализовывать From<CustomError> for std::io::Error.
Ну и конечно, вот пример использования получившейся библиотеки (первая версия с замыканиями и без типизированных ошибок).
echo.rs
pub struct Echo;
#[derive(Debug)]
pub enum Message {
Ping,
}
#[derive(Debug)]
pub enum Response {
Pong {counter: u32},
}
#[derive(Debug,Clone)]
pub struct State {
pub counter: u32,
}
impl Echo {
pub async fn new() -> Arc<Actor<Message, State, Response>> {
let state = State {
counter: 0,
};
Actor::new("echo".to_string(), move |ctx| {
Box::pin(async move {
match ctx.mgs {
Message::Ping => {
println!("Received Ping");
let mut state_lock = ctx.state.lock().await;
state_lock.counter += 1;
Ok(Response::Pong{counter: state_lock.counter})
}
}
})
}, state, 100000).await
}
}
main.rs
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let echo = Echo::new().await;
println!("Sent Ping");
echo.send(Message::Ping).await?;
println!("Sent Ping and ask response");
let pong = echo.ask(Message::Ping).await?;
println!("Got {:?}", pong);
println!("Sent Ping and wait response in callback");
echo.callback(Message::Ping, move |result| {
Box::pin(async move {
let response = result?;
println!("Got {:?}", response);
Ok(())
})
}).await?;
_ = echo.stop();
thread::sleep(std::time::Duration::from_secs(1));
Ok(())
}
Результат работы программы.
Sent Ping
Sent Ping and ask response
Received Ping
Received Ping
Got Pong { counter: 2 }
Sent Ping and wait response in callback
Received Ping
Got Pong { counter: 3 }
После публикации статьи, я получил много комментариев. Спасибо за советы. В итоге сделал следующую версию библиотеки с асинхронным хендлером в трейте и типизированной ошибкой.
echo.rs
#[derive(Debug)]
pub struct Echo;
#[derive(Debug)]
pub enum Message {
Ping,
}
#[derive(Debug)]
pub enum Response {
Pong {counter: u32},
}
#[derive(Debug,Clone)]
pub struct State {
pub counter: u32,
}
#[derive(Debug, Error)]
pub enum EchoError {
#[error("unknown error")]
Unknown,
#[error("std::io::Error")]
StdErr(#[from] std::io::Error),
}
#[async_trait]
impl Handler<Echo, Message, State, Response, EchoError> for Echo {
async fn receive(&self, ctx: Context<Echo, Message, State, Response, EchoError>) -> Result<Response, EchoError> {
match ctx.mgs {
Message::Ping => {
println!("Received Ping");
let mut state_lock = ctx.state.lock().await;
state_lock.counter += 1;
if state_lock.counter > 10 {
Err(EchoError::Unknown)
} else {
Ok(Response::Pong{counter: state_lock.counter})
}
}
}
}
}
main.rs
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let state = State {
counter: 0,
};
let echo_ref = ActorRef::new("echo".to_string(), Echo{}, state, 100000).await;
println!("Sent Ping");
echo_ref.send(Message::Ping).await?;
println!("Sent Ping and ask response");
let pong = echo_ref.ask(Message::Ping).await?;
println!("Got {:?}", pong);
_ = echo_ref.stop();
thread::sleep(std::time::Duration::from_secs(1));
Ok(())
}
Результат работы программы.
Sent Ping
Sent Ping and ask response
Received Ping
Received Ping
Got Pong { counter: 2 }
Код библиотеки здесь. Пулреквесты приветствуются. Код примера использования библиотеки здесь.
Комментарии (16)
143672
20.08.2023 14:43Не actix-ом единым. Есть вот этой фреймворк https://github.com/slawlor/ractor. С асинхронным handle методом
igumnov Автор
20.08.2023 14:43Видел эту реализацию. Но хочется свое написать. Сейчас сделал отдельную ветку где буду прикручивать типизацию по ошибкам и асинхронный обработчик через асинхроную функцию в трейте.
freecoder_xx
20.08.2023 14:43А вот вам мой вариант легковесных акторов: https://github.com/noogen-projects/truba
use truba::{Context, Message, MpscChannel}; struct Value(u32); impl Message for Value { type Channel = MpscChannel<Self>; } struct MyActor { value: u32, } impl MyActor { fn run(ctx: Context, value: u32) { let mut value_in = ctx.receiver::<Value>(); let mut actor = MyActor { value }; truba::spawn_event_loop!(ctx, { Some(msg) = value_in.recv() => { actor.handle_value(msg); }, }); } fn handle_value(&mut self, Value(value): Value) { self.value = value; println!("receive value {value}"); } } #[tokio::main] async fn main() { let ctx = Context::new(); MyActor::run(ctx.clone(), 42); let sender = ctx.sender::<Value>(); sender.send(Value(11)).await.ok(); sender.send(Value(22)).await.ok(); ctx.shutdown().await; }
igumnov Автор
20.08.2023 14:43А зачем надо самому run реализовывать в акторе?
И как я понял у вас только режим send есть? а вариант с ask нету?
Rustified
Посмотрите эту библиотеку: https://crates.io/crates/async-trait
Если я правильно понял, то для некоего дженерика "ActorError" вы хотите использовать Box <ActorError + Send + Sync>. Уберите оттуда "dyn" и всё сработает. Если же в качестве "ActotError" вам надо передать типаж, то уж тогда используйте его.
igumnov Автор
Великолепно - теоретически. Жду от Вас пулреквест )
igumnov Автор
Я попробовал Ваш совет на счет ошибок
pub type BoxDynError<E> = Box<E + Send + Sync>;
выдает ошибку
Trait objects must include the
dyn
keyword [E0782]в общем думаю с https://crates.io/crates/async-trait будут тоже свои сложности )
Mingun
Боксинг нужен, когда у вас неизвестный тип, спрятанный за типажом, а тут у вас generic-параметр, который предоставляет какой-то известный тип. Зачем тут вообще боксинг? Вместо
BoxDynError<E>
используйте простоE: Send + Sync
igumnov Автор
Убрал боксинг в call_back: Mutex<HashMap<i32, Callback<Result<Response, E: Send + Sync>>>>
Я очень рад что Вы пытаетесь мне помочь, но вот что я получил в итоге использовав Ваш совет
Mingun
вроде понятно должно быть, что я имел ввиду это:
Если непонятно, то повод подучить синтаксис раста.
igumnov Автор
Да, разобрался, сейчас сделал отдельную ветку где буду прикручивать типизацию по ошибкам и асинхронный обработчик через асинхроную функцию в трейте.
igumnov Автор
В итоге дело пошло дальше с ошибками, посмотрим что дальше выйдет.
igumnov Автор
Сейчас уперся в это
Другими словами дженерик определен, но как реализовать trait From для него?
igumnov Автор
Пробую Ваши советы
Пока что скомпилировалось - сейчас посмотрим как пойдет дальше )
igumnov Автор
Попробовал пустую реализацию подставить
выдает ошибку
igumnov Автор
Трейт в итоге скомпилировался, посмотрим что дальше выйдет )