问题描述
我正在构建一个小应用程序,它应该以不同的时间间隔安排两个任务(基于 rusoto
AWS SDK):每 X 秒运行一个任务,每 Y 秒运行另一个任务。
我发现 crate crossbeam
提供了一个滴答计时器和一个 select!
宏,并将它们放在一起,如下所示:
fn main() -> Result<(),Error> {
let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
let rt = Runtime::new().unwrap();
let tick_a = tick(Duration::from_secs(60));
let tick_b = tick(Duration::from_secs(30));
loop {
select! {
recv(tick_a) -> _ => {
rt.block_on(cloudwatch_client.put_metric_data( /* ... */ ));
},/* similar for tick_b */
}
}
}
这可以编译,但是程序会因 thread 'main' panicked at 'not currently running on the Tokio runtime.'
而出现混乱。通过分析回溯,这似乎来自 Rusoto 调用。
我在这里错过了什么?有没有办法使这项工作?在 Rust 中是否有更好的方法来处理任务的间隔调度?
请注意,this 问题似乎没有解决我的问题。该问题从使用 futures::executor::block_on
函数开始,并通过使用由 tokio block_on
实现的 Runtime
方法解决。我已经在 block_on
中使用了 Runtime
方法。
解决方法
线程“main”因“当前未在 Tokio 运行时上运行”而恐慌。
如果特定库所需的 tokio 运行时版本未主动运行,则会显示此错误 - 因为每个主要版本使用不同的线程局部变量,并且可以包含 1 个以上的库主要版本相同的构建。
在您的情况下,您可能运行的是 Tokio 0.3 运行时,但 rusoto 需要 Tokio 0.2 运行时。当 rusoto 然后尝试通过 Tokio 0.2(也包含在构建中)执行 IO 时,该程序检测到没有运行时处于活动状态并产生错误。
要解决此问题,请确保在您的项目中仅使用一个 tokio 版本。您可能需要通过 0.2
将 tokio 降级到 Cargo.toml
,因为可能没有更新的 rusoto 版本可用。
另外一个不相关的建议:
除了将 crossbeam 用于计时器之外,您还可以在 tokio 运行时内运行“整个事情”:您可以使用 tokio::select!和 tokio 计时器,用于在此处使用横梁执行您的操作。
见https://docs.rs/tokio/0.2.24/tokio/time/fn.interval.html 和 https://docs.rs/tokio/0.2.24/tokio/macro.select.html(其中包含与您的用例类似的示例)
,CloudWatchClient::put_metric_data
返回一个 RusotoFuture
,它有一个 sync
方法可以做你想要的。所以,而不是:
let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
let rt = Runtime::new().unwrap();
rt.block_on(cloudwatch_client.put_metric_data(/* ... */));
你可以这样做:
let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
cloudwatch_client.put_metric_data(/* ... */).sync();
Which is also what the official docs recommend。
,请注意:
请注意,从 v0.43.0 开始,Rusoto 使用 Rust 的 std::future::Future 和 Tokio 0.2 生态系统。从 v0.46.0 开始,Rusoto 使用 Tokio 1.0 生态系统。
对于简单的 S3 调用,我建议使用以下依赖项:
[dependencies]
rusoto_core = "0.46.0"
rusoto_s3 = "0.46.0"
tokio = {version = "1.0",features = ["full"]}
调用可以这样实现:
use rusoto_core::Region;
use rusoto_s3::{S3,S3Client};
#[tokio::main]
async fn main() {
let region = Region::UsEast1;
let client = S3Client::new(region);
match client.list_buckets().await {
Ok(output) => match output.buckets {
Some(bucket_list) => {
println!("Buckets in S3:");
for bucket in bucket_list {
println!("{:?}",bucket.name);
}
}
None => println!("No buckets!"),},Err(error) => {
println!("Error: {:?}",error);
}
}
}
设置凭据以避免凭据错误。