Приложения на Rust часто используют асинхронные библиотеки, такие как Tokio и Actix. Эти библиотеки предоставляют инструменты для асинхронного ввода-вывода и параллельных вычислений и т.д. Однако иногда возникают проблемы при совместном использовании разных асинхронных библиотек.
Сегодня я хочу поделиться с вами опытом решения одной из распространенных проблем при работе с библиотеками Tokio и Actix. Конкретно, мы рассмотрим ошибку "error: future cannot be sent between threads safely", которая может возникнуть, когда вы пытаетесь использовать клиент Actix внутри асинхронной функции, запущенной с помощью Tokio. Я расскажу вам, как преодолеть эту проблему.
Исходный код с Actix
Давайте начнем с простого примера кода, который работает только с Actix и не вызывает проблем:
use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main() {
actix_rt::spawn(async {
HttpServer::new(|| {
App::new()
.service(web::resource("/hello").route(web::get().to(ok)))
})
.bind("127.0.0.1:8080")?
.run()
.await
});
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
let ret = client.get(url).send().await.unwrap().body().await.unwrap();
println!("{:?}", ret);
}
async fn ok() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body("OK")
}
В этом коде мы создаем HTTP-сервер с помощью Actix и выполняем GET-запрос к нему с использованием клиента Actix. Все работает отлично, но проблемы начинаются, когда мы попытаемся использовать клиент Actix в асинхронной функции, запущенной с помощью Tokio.
Проблема "error: future cannot be sent between threads safely"
Когда мы попытаемся вызвать клиент Actix из Tokio, мы столкнемся с ошибкой "error: future cannot be sent between threads safely. future created by async block is not `Send`. has type `awc::Client` which is not `Send`". Это происходит потому, что клиент Actix не является Send, что означает, что его нельзя безопасно передавать между потоками.
Вот пример кода, который вызывает эту ошибку:
use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main() {
actix_rt::spawn(async {
HttpServer::new(|| {
App::new()
.service(web::resource("/hello").route(web::get().to(ok)))
})
.bind("127.0.0.1:8080")?
.run()
.await
});
let r = tokio::spawn(async move {
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
client.get(url).send().await.unwrap().body().await.unwrap()
}).await.unwrap();
println!("{:?}", r);
}
async fn ok() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body("OK")
}
Решение проблемы с использованием Oneshot из Tokio
Чтобы решить эту проблему и сделать код безопасным для использования в Tokio, мы можем воспользоваться механизмом Oneshot из Tokio. Этот механизм позволяет нам обернуть результат выполнения клиента Actix и передать его между потоками безопасно.
Вот пример кода, который использует Oneshot для решения проблемы:
use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main() {
actix_rt::spawn(async {
HttpServer::new(|| {
App::new()
.service(web::resource("/hello").route(web::get().to(ok)))
})
.bind("127.0.0.1:8080")?
.run()
.await
});
let (sender, receiver) = tokio::sync::oneshot::channel();
actix_rt::spawn(async move {
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
let _ = sender.send(client.get(url).send().await.unwrap().body().await.unwrap());
});
let r = tokio::spawn(async move {
receiver.await.unwrap()
}).await.unwrap();
println!("{:?}", r);
std::mem::forget(runtime);
}
async fn ok() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body("OK")
}
Итак, после долгих поисков в Google, изучения вопроса на Stack Overflow и многочисленных попыток получить ответ от ChatGPT, я так и не нашел готового решения своей проблемы. Пришлось включить собственные серые клеточки и пораскинуть мозгами. Надеюсь, что мое решение будет полезным кому-то.
Комментарии (16)
mayorovp
05.10.2023 12:37+3Теперь по поводу вашего решения. При подобном использовании канала теряется возможность отмены задачи, если внешняя задача будет отменена — клиент продолжит выполнять запрос.
Документация на oneshot рекомендует делать как-то так:
tokio::task::spawn_local(async move { tokio::select! { _ = sender.closed() => { } value = client.get(url).send().await.unwrap().body() => { let _ = sender.send(value); } } });
Кстати, почему бы не оформить это в виде универсальной функции?
pub fn send_future<F>(future: F) -> impl Future<Output = <F as Future>::Output> + Send where F: Future + 'static, <F as Future>::Output: Send { let (mut tx, rx) = tokio::sync::oneshot::channel(); tokio::task::spawn_local(async move { tokio::select! { _ = tx.closed() => { } value = future => { let _ = tx.send(value); } }; }); async move { rx.await.unwrap() } } // … tokio::spawn(async move { send_future(async move { let client = Client::new(); let url = "http://127.0.0.1:8080/hello"; client.get(url).send().await?.body().await?; }).await; });
mayorovp
05.10.2023 12:37Так, я тут подумал и понял что все эти каналы нафиг не нужны.
Функция tokio::task::spawn_local, она же actix_rt::spawn, уже делает всё что нужно! Проверять уже лень, но вот так должно сработать:
let http_task = actix_rt::spawn(async move { let client = Client::new(); let url = "http://127.0.0.1:8080/hello"; client.get(url).send().await?.body().await?; }); tokio::spawn(async move { http_task.await });
И даже вот так:
tokio::spawn(async move { actix_rt::spawn(async move { let client = Client::new(); let url = "http://127.0.0.1:8080/hello"; client.get(url).send().await?.body().await?; }).await; });
Хотя, в отличии от send_future из моего комментария выше, тут снова будет проблема с отменой...
igumnov Автор
05.10.2023 12:37Не проканало :)
error[E0277]: `Rc<(dyn actix_web::dev::Service<ConnectRequest, Future = Pin<Box<(dyn std::future::Future<Output = Result<ConnectResponse, SendRequestError>> + 'static)>>, Response = ConnectResponse, Error = SendRequestError> + 'static)>` cannot be sent between threads safely --> src\main.rs:17:26 | 17 | let r = tokio::spawn(async move { | _____________------------_^ | | | | | required by a bound introduced by this call 18 | | actix_rt::spawn(async move { 19 | | let client = Client::new(); 20 | | let url = "http://127.0.0.1:8080/hello"; 21 | | client.get(url).send().await.unwrap().body().await.unwrap() 22 | | }).await 23 | | }).await; | | ^ | | | | |_____`Rc<(dyn actix_web::dev::Service<ConnectRequest, Future = Pin<Box<(dyn std::future::Future<Output = Result<ConnectResponse, SendRequestError>> + 'static)>>, Response = ConnectResponse, Error = SendRequestError> + 'static)>` cannot be sent between threads safely | within this `[async block@src\main.rs:17:26: 23:6]`
nanoqsh
05.10.2023 12:37-1#[actix_rt::main] async fn main() { actix_rt::spawn(async { HttpServer::new(|| App::new().service(web::resource("/hello").route(web::get().to(ok)))) .bind("127.0.0.1:8080")? .run() .await }); let r = actix_rt::spawn(async move { // ^^^^^^^^^^^^^^^ вместо tokio::spawn let client = Client::new(); let url = "http://127.0.0.1:8080/hello"; client.get(url).send().await.unwrap().body().await.unwrap() }) .await .unwrap(); println!("{:?}", r); }
Проверил, отлично работает. tokio тут в зависимости вообще не нужен. У автора кривые руки, лучше бы на Го писал
igumnov Автор
05.10.2023 12:37+2Уважаемый комментатор, благодарю Вас за предложенный вариант кода и замечания.
Вы правы, что в Вашем коде создание задачи происходит в текущей thread. Однако если изменить его, создав задачу в новой thread, то проблема опять проявится - мы получим ту же ошибку "future cannot be sent between threads safely".
Цель моей статьи - показать типичную проблему при взаимодействии библиотек Tokio и Actix, когда из Tokio нужно вызвать код Actix. Ваш код не демонстрирует эту проблему.
Пример кода в статье максимально упрощен, чтобы наглядно воспроизвести ошибку для читателей. Конечно, в реальном приложении код будет сложнее. Но суть проблемы остается той же.
Я ценю, что Вы уделили время, чтобы предложить альтернативный подход. Но в данном случае он не решает рассматриваемую в статье проблему взаимодействия Tokio и Actix. Еще раз спасибо за отзыв, обсуждение таких нюансов всегда полезно для углубления понимания темы.
mayorovp
05.10.2023 12:37А вас не смутило что этот код приведён в статье как пример работающего, но делающего не то что нужно?
igumnov Автор
05.10.2023 12:37-1Уважаемый, благодарю Вас за комментарий. К сожалению, я не совсем понял Вашу мысль о том, что приведенный мною пример кода работает неправильно.
Я представил работающий фрагмент кода для иллюстрации определенной концепции. Затем я показал, как этот же код может сломаться при использовании в многопоточной среде и не скомпилируется, что демонстрирует важность учета многопоточности.
Если я неверно истолковал Ваш комментарий, прошу прощения. Буду благодарен, если Вы уточните свою мысль, чтобы я мог лучше понять Вашу позицию и внести необходимые исправления в статью. Мне важно представить информацию максимально корректно для читателей.
mayorovp
05.10.2023 12:37-1Я к тому, что если бы actix_rt::spawn делал ровно то что вам нужно — статьи бы не было
igumnov Автор
05.10.2023 12:37actix_rt::spawn запускает задачу в текущем потоке (thread). Например:
use actix_rt::{spawn, Arbiter}; fn main() { spawn(|| { println!("Running in the main thread"); }); }
Здесь мы запускаем задачу println в основном потоке приложения.
Если нужен новый поток, используем Arbiter:
let arbiter = Arbiter::new(); arbiter.spawn(|| { println!("Running in a separate thread"); });
Arbiter создает пул потоков и позволяет запускать задачи в отдельных потоках из этого пула.
Таким образом, actix_rt::spawn для текущего потока, а Arbiter - для создания новых потоков.
Другими словами actix_rt::spawn не делает то что не нужно. А если нужно несколько thread то нужно использовать иной механизм.
Похоже у вас проблемы с пониманием что такое thread... В моей статье как раз и разбирается кейс когда не 1 thread, а несколько, начинаются проблемы...
mayorovp
А на кой вам вообще понадобилось создавать новый рантайм tokio, и чем не устроил тот, в котором выполняется main?
Если что, actix_rt::spawn — это синоним для tokio::task::spawn_local...
igumnov Автор
Наверное, потому что:
tokio::task::spawn_local
запускает асинхронную задачу в текущем локальном контексте выполнения, который обычно ограничен одним потоком выполнения (thread). Это означает, что задача будет выполняться в пределах текущего потока выполнения без возможности переключения на другие потоки.mayorovp
Так про tokio::task::spawn_local я написал просто для демонстрации связи между actix и tokio. Запускать задачу можно при помощи обычного tokio::spawn.
Зачем для запуска tokio::spawn создавать новый рантайм?
igumnov Автор
Да, можно. Это упрощает пример на 1 строчку кода. Для этого примера создавать новый runtime - избыточно. Лучшее, враг хорошего...
nanoqsh
Зачем тут вообще два или более потока?
igumnov Автор
Уважаемый комментатор,
Пример кода, приведенный в статье, является упрощенным и синтетическим. Это сделано с целью воспроизведения проблемы и наглядной демонстрации ее решения. Сложности возникают, когда весь этот код работает в разных потоках.
Буду признателен, если Вы сможете переписать данный пример кода таким образом, чтобы продемонстрировать возникающую проблему и предложить оптимальный вариант ее решения. Это позволит читателям лучше понять суть обсуждаемого вопроса. Заранее благодарю за помощь в улучшении качества статьи.