В своем последнем проекте я использовал популярную библиотеку Actix для реализации модели акторов. Однако в процессе разработки я столкнулся с ограничениями.


Обработчики сообщений в Actix реализуются через трейт Handler, который требует синхронных методов, а не асинхронных функций. Для использования асинхронных вещей это крайне не удобно.

В Actix все акторы по умолчанию размещаются в одном потоке. Чтобы запустить акторы в отдельных потоках, можно создать отдельный Arbiter для каждого актора, это даст изолированный поток. А для моих потребностей нужно, чтобы каждый актор работал в своем потоке и чтобы они не были изолированы из-за этого для общения.

В итоге, я решил переписать свой код с использованием библиотеки Tokio Mpsc. Чтобы не дублировать общую логику взаимодействия акторов в разных частях проекта, я вынес ее в отдельную библиотеку. В отличие от Actix, я реализовал ее не как Фреймворк, а как чистую библиотеку общего назначения.

Всем известны различия между фреймворком и библиотекой в программировании:

Фреймворк

  • Предоставляет скелет приложения и определяет поток управления. Разработчик должен вписать свой код в предопределенные места.

  • Имеет инверсию управления - фреймворк вызывает код разработчика.

  • Обычно сложнее в освоении, чем библиотеки.

Плюсы:

  • Ускоряет разработку за счет готовой архитектуры и инструментов.

  • Поощряет следование лучшим практикам и паттернам проектирования.

  • Облегчает тестирование и поддержку кода.

Минусы:

  • Высокая избыточность для простых проектов.

  • Сложность в освоении из-за большого количества функций.

  • Привязка к конкретному фреймворку.

Библиотека

  • Предоставляет набор готовых классов и функций для решения частных задач.

  • Разработчик вызывает код библиотеки. Инверсия управления отсутствует.

  • Проще в освоении и применении, чем фреймворки.

Плюсы:

  • Легкая интеграция в существующие проекты.

  • Возможность выбрать только нужный набор функций.

  • Низкая привязка к конкретной библиотеке.

Минусы:

  • Требуется самостоятельно писать основной код приложения.

  • Отсутствие готовых шаблонов и архитектуры.

  • Трудности с поддержкой кода из-за отсутствия общих паттернов.

В целом, я устал от фреймворков и решил писать библиотеку. Надоело упираться в концептуальные ограничения от них.

Между тем текущие ограничения Rust не позволили мне полностью реализовать желаемый API. Например, невозможно объявить async метод в трейте, поэтому обработчик сообщений пришлось оформить как замыкание.

Еще одна проблема возникла при попытке добавить в библиотеку свой тип ошибок. Из-за ограничений в Rust, нельзя использовать параметризированный тип ошибки. Например, если в Box<dyn std::io::Error + Send + Sync> заменить std::io::Error на параметр ActorError из шаблона, то выдается ошибка, что требуется trait после dyn. В итоге, пришлось все сделать на std::io::Error. При использовании библиотеки, если хотите пользоваться магией символа '?', нужно для своих ошибок реализовывать From<CustomError> for std::io::Error.

Ну и конечно, вот пример использования получившейся библиотеки (первая версия с замыканиями и без типизированных ошибок).

echo.rs

pub struct Echo;

#[derive(Debug)]
pub enum Message {
    Ping,
}

#[derive(Debug)]
pub enum Response {
    Pong {counter: u32},
}

#[derive(Debug,Clone)]
pub struct State {
    pub counter: u32,
}

impl Echo {
    pub async fn new() -> Arc<Actor<Message, State, Response>> {

        let state = State {
            counter: 0,
        };

        Actor::new("echo".to_string(), move |ctx| {
            Box::pin(async move {
                match ctx.mgs {
                    Message::Ping => {
                        println!("Received Ping");
                        let mut state_lock = ctx.state.lock().await;
                        state_lock.counter += 1;
                        Ok(Response::Pong{counter: state_lock.counter})
                    }
                }

            })
        }, state, 100000).await
    }

}

main.rs

#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    let echo = Echo::new().await;

    println!("Sent Ping");
    echo.send(Message::Ping).await?;

    println!("Sent Ping and ask response");
    let pong = echo.ask(Message::Ping).await?;
    println!("Got {:?}", pong);

    println!("Sent Ping and wait response in callback");
    echo.callback(Message::Ping, move |result| {
        Box::pin(async move {
            let response = result?;
            println!("Got {:?}", response);
            Ok(())
        })
    }).await?;

    _ = echo.stop();
    thread::sleep(std::time::Duration::from_secs(1));
    Ok(())
}

Результат работы программы.

Sent Ping
Sent Ping and ask response
Received Ping
Received Ping
Got Pong { counter: 2 }
Sent Ping and wait response in callback
Received Ping
Got Pong { counter: 3 }

После публикации статьи, я получил много комментариев. Спасибо за советы. В итоге сделал следующую версию библиотеки с асинхронным хендлером в трейте и типизированной ошибкой.

echo.rs

#[derive(Debug)]
pub struct Echo;

#[derive(Debug)]
pub enum Message {
    Ping,
}

#[derive(Debug)]
pub enum Response {
    Pong {counter: u32},
}

#[derive(Debug,Clone)]
pub struct State {
    pub counter: u32,
}

#[derive(Debug, Error)]
pub enum EchoError {
    #[error("unknown error")]
    Unknown,
    #[error("std::io::Error")]
    StdErr(#[from] std::io::Error),
}

#[async_trait]
impl Handler<Echo, Message, State, Response, EchoError> for Echo {
    async fn receive(&self, ctx: Context<Echo, Message, State, Response, EchoError>) -> Result<Response, EchoError> {
        match ctx.mgs {
            Message::Ping => {
                println!("Received Ping");
                let mut state_lock = ctx.state.lock().await;
                state_lock.counter += 1;
                if state_lock.counter > 10 {
                    Err(EchoError::Unknown)
                } else {
                    Ok(Response::Pong{counter: state_lock.counter})
                }
            }
        }
    }
}

main.rs

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = State {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo".to_string(), Echo{},  state, 100000).await;

    println!("Sent Ping");
    echo_ref.send(Message::Ping).await?;

    println!("Sent Ping and ask response");
    let pong = echo_ref.ask(Message::Ping).await?;
    println!("Got {:?}", pong);

    _ = echo_ref.stop();
    thread::sleep(std::time::Duration::from_secs(1));
    Ok(())
}

Результат работы программы.

Sent Ping
Sent Ping and ask response
Received Ping
Received Ping
Got Pong { counter: 2 }

Код библиотеки здесь. Пулреквесты приветствуются. Код примера использования библиотеки здесь.

Комментарии (16)


  1. Rustified
    20.08.2023 14:43
    +1

    невозможно объявить async метод в трейте

    Посмотрите эту библиотеку: https://crates.io/crates/async-trait

    Например, если в Box<dyn std::io::Error + Send + Sync> заменить std::io::Error на параметр ActorError из шаблона

    Если я правильно понял, то для некоего дженерика "ActorError" вы хотите использовать Box <ActorError + Send + Sync>. Уберите оттуда "dyn" и всё сработает. Если же в качестве "ActotError" вам надо передать типаж, то уж тогда используйте его.


    1. igumnov Автор
      20.08.2023 14:43

      Великолепно - теоретически. Жду от Вас пулреквест )


    1. igumnov Автор
      20.08.2023 14:43
      +1

      Я попробовал Ваш совет на счет ошибок

      pub type BoxDynError<E> = Box<E + Send + Sync>;

      выдает ошибку

      Trait objects must include the dyn keyword [E0782]

      в общем думаю с https://crates.io/crates/async-trait будут тоже свои сложности )


      1. Mingun
        20.08.2023 14:43
        +1

        Боксинг нужен, когда у вас неизвестный тип, спрятанный за типажом, а тут у вас generic-параметр, который предоставляет какой-то известный тип. Зачем тут вообще боксинг? Вместо BoxDynError<E> используйте просто E: Send + Sync


        1. igumnov Автор
          20.08.2023 14:43

          Убрал боксинг в call_back: Mutex<HashMap<i32, Callback<Result<Response, E: Send + Sync>>>>

          pub struct Actor<Message, State, Response,E> {
              tx: Mutex<Option<mpsc::Sender<(Message, i32)>>>,
              rt:  Mutex<Option<Arc<Runtime>>>,
              join_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
              state: Option<Arc<Mutex<State>>>,
              self_ref: Mutex<Option<Arc<Actor<Message, State, Response, E, >>>>,
              message_id: Mutex<i32>,
              call_back: Mutex<HashMap<i32, Callback<Result<Response, E: Send + Sync>>>>,
              promise: Mutex<HashMap<i32, Sender<Result<Response, BoxDynError>>>>,
              name: String,
          
          }


          Я очень рад что Вы пытаетесь мне помочь, но вот что я получил в итоге использовав Ваш совет

          error[E0658]: associated type bounds are unstable
            --> lib\src\lib.rs:39:61
             |
          39 |     call_back: Mutex<HashMap<i32, Callback<Result<Response, E: Send + Sync>>>>,
             |                                                             ^^^^^^^^^^^^^^


          1. Mingun
            20.08.2023 14:43

            вроде понятно должно быть, что я имел ввиду это:


            pub struct Actor<Message, State, Response, E: Send + Sync> {
              call_back: Mutex<HashMap<i32, Callback<Result<Response, E>>>>,
            }

            Если непонятно, то повод подучить синтаксис раста.


            1. igumnov Автор
              20.08.2023 14:43

              Да, разобрался, сейчас сделал отдельную ветку где буду прикручивать типизацию по ошибкам и асинхронный обработчик через асинхроную функцию в трейте.


        1. igumnov Автор
          20.08.2023 14:43

          В итоге дело пошло дальше с ошибками, посмотрим что дальше выйдет.


          1. igumnov Автор
            20.08.2023 14:43

            Сейчас уперся в это

             |                 tx.send((msg, *message_id_lock)).await?;
                |                                                       ^ the trait `From<tokio::sync::mpsc::error::SendError<(Message, i32)>>` is not implemented for `Error`
            

            Другими словами дженерик определен, но как реализовать trait From для него?


    1. igumnov Автор
      20.08.2023 14:43

      Пробую Ваши советы

      #[async_trait]
      trait Actor <Message, State, Response> {
          async fn receive(&self, ctx: Context<Message, State, Response>) -> CallbackFuture<Result<Response, BoxDynError>>;
      }

      Пока что скомпилировалось - сейчас посмотрим как пойдет дальше )


      1. igumnov Автор
        20.08.2023 14:43

        Попробовал пустую реализацию подставить

        impl Actor<Message, State, Response> for ActorInstance<Message, State, Response> {
            async fn receive(&self, ctx: Context<Message, State, Response>) -> CallbackFuture<Result<Response, BoxDynError>> {
        
            }
        }

        выдает ошибку

        error[E0412]: cannot find type `Message` in this scope
          --> lib\src\lib.rs:56:12
           |
        56 | impl Actor<Message, State, Response> for ActorInstance<Message, State, Response> {
           |            ^^^^^^^ not found in this scope
        


        1. igumnov Автор
          20.08.2023 14:43

          Трейт в итоге скомпилировался, посмотрим что дальше выйдет )


  1. 143672
    20.08.2023 14:43

    Не actix-ом единым. Есть вот этой фреймворк https://github.com/slawlor/ractor. С асинхронным handle методом


    1. igumnov Автор
      20.08.2023 14:43

      Видел эту реализацию. Но хочется свое написать. Сейчас сделал отдельную ветку где буду прикручивать типизацию по ошибкам и асинхронный обработчик через асинхроную функцию в трейте.


  1. freecoder_xx
    20.08.2023 14:43

    А вот вам мой вариант легковесных акторов: https://github.com/noogen-projects/truba

    use truba::{Context, Message, MpscChannel};
    
    struct Value(u32);
    
    impl Message for Value {
        type Channel = MpscChannel<Self>;
    }
    
    struct MyActor {
        value: u32,
    }
    
    impl MyActor {
        fn run(ctx: Context, value: u32) {
            let mut value_in = ctx.receiver::<Value>();
            let mut actor = MyActor { value };
    
            truba::spawn_event_loop!(ctx, {
                Some(msg) = value_in.recv() => {
                    actor.handle_value(msg);
                },
            });
        }
    
        fn handle_value(&mut self, Value(value): Value) {
            self.value = value;
            println!("receive value {value}");
        }
    }
    
    #[tokio::main]
    async fn main() {
        let ctx = Context::new();
        MyActor::run(ctx.clone(), 42);
    
        let sender = ctx.sender::<Value>();
        sender.send(Value(11)).await.ok();
        sender.send(Value(22)).await.ok();
    
        ctx.shutdown().await;
    }


    1. igumnov Автор
      20.08.2023 14:43

      А зачем надо самому run реализовывать в акторе?
      И как я понял у вас только режим send есть? а вариант с ask нету?