Приложения на Rust часто используют асинхронные библиотеки, такие как Tokio и Actix. Эти библиотеки предоставляют инструменты для асинхронного ввода-вывода и параллельных вычислений и т.д. Однако иногда возникают проблемы при совместном использовании разных асинхронных библиотек.

Сегодня я хочу поделиться с вами опытом решения одной из распространенных проблем при работе с библиотеками Tokio и Actix. Конкретно, мы рассмотрим ошибку "error: future cannot be sent between threads safely", которая может возникнуть, когда вы пытаетесь использовать клиент Actix внутри асинхронной функции, запущенной с помощью Tokio. Я расскажу вам, как преодолеть эту проблему.

Исходный код с Actix

Давайте начнем с простого примера кода, который работает только с Actix и не вызывает проблем:

use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main()  {
    actix_rt::spawn(async {
        HttpServer::new(|| {
            App::new()
                .service(web::resource("/hello").route(web::get().to(ok)))
        })
            .bind("127.0.0.1:8080")?
            .run()
            .await
    });
    let client = Client::new();
    let url = "http://127.0.0.1:8080/hello";
    let ret =  client.get(url).send().await.unwrap().body().await.unwrap();
    println!("{:?}", ret);
}
async fn ok() -> HttpResponse {
    HttpResponse::Ok()
        .content_type("text/html; charset=utf-8")
        .body("OK")
}

В этом коде мы создаем HTTP-сервер с помощью Actix и выполняем GET-запрос к нему с использованием клиента Actix. Все работает отлично, но проблемы начинаются, когда мы попытаемся использовать клиент Actix в асинхронной функции, запущенной с помощью Tokio.

Проблема "error: future cannot be sent between threads safely"

Когда мы попытаемся вызвать клиент Actix из Tokio, мы столкнемся с ошибкой "error: future cannot be sent between threads safely. future created by async block is not `Send`. has type `awc::Client` which is not `Send`". Это происходит потому, что клиент Actix не является Send, что означает, что его нельзя безопасно передавать между потоками.

Вот пример кода, который вызывает эту ошибку:

use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main()  {
    actix_rt::spawn(async {
        HttpServer::new(|| {
            App::new()
                .service(web::resource("/hello").route(web::get().to(ok)))
        })
            .bind("127.0.0.1:8080")?
            .run()
            .await
    });
    let r = tokio::spawn(async move {
        let client = Client::new();
        let url = "http://127.0.0.1:8080/hello";
        client.get(url).send().await.unwrap().body().await.unwrap()
    }).await.unwrap();
    println!("{:?}", r);
}
async fn ok() -> HttpResponse {
    HttpResponse::Ok()
        .content_type("text/html; charset=utf-8")
        .body("OK")
}

Решение проблемы с использованием Oneshot из Tokio

Чтобы решить эту проблему и сделать код безопасным для использования в Tokio, мы можем воспользоваться механизмом Oneshot из Tokio. Этот механизм позволяет нам обернуть результат выполнения клиента Actix и передать его между потоками безопасно.

Вот пример кода, который использует Oneshot для решения проблемы:

use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main()  {
    actix_rt::spawn(async {
        HttpServer::new(|| {
            App::new()
                .service(web::resource("/hello").route(web::get().to(ok)))
        })
            .bind("127.0.0.1:8080")?
            .run()
            .await
    });
    let (sender, receiver) = tokio::sync::oneshot::channel();
    actix_rt::spawn(async move {
        let client = Client::new();
        let url = "http://127.0.0.1:8080/hello";
        let _ = sender.send(client.get(url).send().await.unwrap().body().await.unwrap());
    });
    let r = tokio::spawn(async move {
        receiver.await.unwrap()
    }).await.unwrap();
    println!("{:?}", r);
    std::mem::forget(runtime);
}
async fn ok() -> HttpResponse {
    HttpResponse::Ok()
        .content_type("text/html; charset=utf-8")
        .body("OK")
}

Итак, после долгих поисков в Google, изучения вопроса на Stack Overflow и многочисленных попыток получить ответ от ChatGPT, я так и не нашел готового решения своей проблемы. Пришлось включить собственные серые клеточки и пораскинуть мозгами. Надеюсь, что мое решение будет полезным кому-то.

Комментарии (16)


  1. mayorovp
    05.10.2023 12:37
    +3

    А на кой вам вообще понадобилось создавать новый рантайм tokio, и чем не устроил тот, в котором выполняется main?


    Если что, actix_rt::spawn — это синоним для tokio::task::spawn_local...


    1. igumnov Автор
      05.10.2023 12:37
      -2

      Наверное, потому что:

      tokio::task::spawn_local запускает асинхронную задачу в текущем локальном контексте выполнения, который обычно ограничен одним потоком выполнения (thread). Это означает, что задача будет выполняться в пределах текущего потока выполнения без возможности переключения на другие потоки.


      1. mayorovp
        05.10.2023 12:37
        +3

        Так про tokio::task::spawn_local я написал просто для демонстрации связи между actix и tokio. Запускать задачу можно при помощи обычного tokio::spawn.


        Зачем для запуска tokio::spawn создавать новый рантайм?


        1. igumnov Автор
          05.10.2023 12:37
          +1

          Да, можно. Это упрощает пример на 1 строчку кода. Для этого примера создавать новый runtime - избыточно. Лучшее, враг хорошего...


      1. nanoqsh
        05.10.2023 12:37
        -2

        Зачем тут вообще два или более потока?


        1. igumnov Автор
          05.10.2023 12:37

          Уважаемый комментатор,

          Пример кода, приведенный в статье, является упрощенным и синтетическим. Это сделано с целью воспроизведения проблемы и наглядной демонстрации ее решения. Сложности возникают, когда весь этот код работает в разных потоках.

          Буду признателен, если Вы сможете переписать данный пример кода таким образом, чтобы продемонстрировать возникающую проблему и предложить оптимальный вариант ее решения. Это позволит читателям лучше понять суть обсуждаемого вопроса. Заранее благодарю за помощь в улучшении качества статьи.


  1. mayorovp
    05.10.2023 12:37
    +3

    Теперь по поводу вашего решения. При подобном использовании канала теряется возможность отмены задачи, если внешняя задача будет отменена — клиент продолжит выполнять запрос.


    Документация на oneshot рекомендует делать как-то так:


        tokio::task::spawn_local(async move {
            tokio::select! {
                _ = sender.closed() => { }
                value = client.get(url).send().await.unwrap().body() => {
                    let _ = sender.send(value);
                }
            }
        });

    Кстати, почему бы не оформить это в виде универсальной функции?


    pub fn send_future<F>(future: F)
        -> impl Future<Output = <F as Future>::Output> + Send
    where 
        F: Future + 'static,
        <F as Future>::Output: Send
    {
        let (mut tx, rx) = tokio::sync::oneshot::channel();
        tokio::task::spawn_local(async move {
            tokio::select! {
                _ = tx.closed() => { }
                value = future => {
                    let _ = tx.send(value);
                }
            };
        });
    
        async move {
            rx.await.unwrap()
        }
    }
    
    // …
    
    tokio::spawn(async move {
        send_future(async move {
            let client = Client::new();
            let url = "http://127.0.0.1:8080/hello";
            client.get(url).send().await?.body().await?;
        }).await;
    });


    1. igumnov Автор
      05.10.2023 12:37

      LGTM


  1. mayorovp
    05.10.2023 12:37

    Так, я тут подумал и понял что все эти каналы нафиг не нужны.


    Функция tokio::task::spawn_local, она же actix_rt::spawn, уже делает всё что нужно! Проверять уже лень, но вот так должно сработать:


        let http_task = actix_rt::spawn(async move {
            let client = Client::new();
            let url = "http://127.0.0.1:8080/hello";
            client.get(url).send().await?.body().await?;
        });
        tokio::spawn(async move {
            http_task.await
        });

    И даже вот так:


    tokio::spawn(async move {
        actix_rt::spawn(async move {
            let client = Client::new();
            let url = "http://127.0.0.1:8080/hello";
            client.get(url).send().await?.body().await?;
        }).await;
    });

    Хотя, в отличии от send_future из моего комментария выше, тут снова будет проблема с отменой...


    1. igumnov Автор
      05.10.2023 12:37

      Не проканало :)

      error[E0277]: `Rc<(dyn actix_web::dev::Service<ConnectRequest, Future = Pin<Box<(dyn std::future::Future<Output = Result<ConnectResponse, SendRequestError>> + 'static)>>, Response = ConnectResponse, Error = SendRequestError> + 'static)>` cannot be sent between threads safely
         --> src\main.rs:17:26
          |
      17  |       let r = tokio::spawn(async move {
          |  _____________------------_^
          | |             |
          | |             required by a bound introduced by this call
      18  | |         actix_rt::spawn(async move {
      19  | |             let client = Client::new();
      20  | |             let url = "http://127.0.0.1:8080/hello";
      21  | |             client.get(url).send().await.unwrap().body().await.unwrap()
      22  | |         }).await
      23  | |     }).await;
          | |     ^
          | |     |
          | |_____`Rc<(dyn actix_web::dev::Service<ConnectRequest, Future = Pin<Box<(dyn std::future::Future<Output = Result<ConnectResponse, SendRequestError>> + 'static)>>, Response = ConnectResponse, Error = SendRequestError> + 'static)>` cannot be sent between threads safely
          |       within this `[async block@src\main.rs:17:26: 23:6]`


    1. nanoqsh
      05.10.2023 12:37
      -1

      #[actix_rt::main]
      async fn main() {
          actix_rt::spawn(async {
              HttpServer::new(|| App::new().service(web::resource("/hello").route(web::get().to(ok))))
                  .bind("127.0.0.1:8080")?
                  .run()
                  .await
          });
          let r = actix_rt::spawn(async move {
              //  ^^^^^^^^^^^^^^^ вместо tokio::spawn
              let client = Client::new();
              let url = "http://127.0.0.1:8080/hello";
              client.get(url).send().await.unwrap().body().await.unwrap()
          })
          .await
          .unwrap();
          println!("{:?}", r);
      }

      Проверил, отлично работает. tokio тут в зависимости вообще не нужен. У автора кривые руки, лучше бы на Го писал


      1. igumnov Автор
        05.10.2023 12:37
        +2

        Уважаемый комментатор, благодарю Вас за предложенный вариант кода и замечания.

        Вы правы, что в Вашем коде создание задачи происходит в текущей thread. Однако если изменить его, создав задачу в новой thread, то проблема опять проявится - мы получим ту же ошибку "future cannot be sent between threads safely".

        Цель моей статьи - показать типичную проблему при взаимодействии библиотек Tokio и Actix, когда из Tokio нужно вызвать код Actix. Ваш код не демонстрирует эту проблему.

        Пример кода в статье максимально упрощен, чтобы наглядно воспроизвести ошибку для читателей. Конечно, в реальном приложении код будет сложнее. Но суть проблемы остается той же.

        Я ценю, что Вы уделили время, чтобы предложить альтернативный подход. Но в данном случае он не решает рассматриваемую в статье проблему взаимодействия Tokio и Actix. Еще раз спасибо за отзыв, обсуждение таких нюансов всегда полезно для углубления понимания темы.



      1. mayorovp
        05.10.2023 12:37

        А вас не смутило что этот код приведён в статье как пример работающего, но делающего не то что нужно?


        1. igumnov Автор
          05.10.2023 12:37
          -1

          Уважаемый, благодарю Вас за комментарий. К сожалению, я не совсем понял Вашу мысль о том, что приведенный мною пример кода работает неправильно.

          Я представил работающий фрагмент кода для иллюстрации определенной концепции. Затем я показал, как этот же код может сломаться при использовании в многопоточной среде и не скомпилируется, что демонстрирует важность учета многопоточности.

          Если я неверно истолковал Ваш комментарий, прошу прощения. Буду благодарен, если Вы уточните свою мысль, чтобы я мог лучше понять Вашу позицию и внести необходимые исправления в статью. Мне важно представить информацию максимально корректно для читателей.


          1. mayorovp
            05.10.2023 12:37
            -1

            Я к тому, что если бы actix_rt::spawn делал ровно то что вам нужно — статьи бы не было


            1. igumnov Автор
              05.10.2023 12:37

              actix_rt::spawn запускает задачу в текущем потоке (thread). Например:

              use actix_rt::{spawn, Arbiter}; 
              
              fn main() {
                spawn(|| {
                  println!("Running in the main thread"); 
                });
              }

              Здесь мы запускаем задачу println в основном потоке приложения.

              Если нужен новый поток, используем Arbiter:

              let arbiter = Arbiter::new();
              
              arbiter.spawn(|| {
                println!("Running in a separate thread");
              });

              Arbiter создает пул потоков и позволяет запускать задачи в отдельных потоках из этого пула.

              Таким образом, actix_rt::spawn для текущего потока, а Arbiter - для создания новых потоков.

              Другими словами actix_rt::spawn не делает то что не нужно. А если нужно несколько thread то нужно использовать иной механизм.

              Похоже у вас проблемы с пониманием что такое thread... В моей статье как раз и разбирается кейс когда не 1 thread, а несколько, начинаются проблемы...