混搭选择!和 Rust 中的异步调用

问题描述

我正在构建一个小应用程序,它应该以不同的时间间隔安排两个任务(基于 rusoto AWS SDK):每 X 秒运行一个任务,每 Y 秒运行另一个任务。

我发现 crate crossbeam 提供了一个滴答计时器和一个 select! 宏,并将它们放在一起,如下所示:

fn main() -> Result<(),Error> {
  let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
  let rt = Runtime::new().unwrap();
  let tick_a = tick(Duration::from_secs(60));
  let tick_b = tick(Duration::from_secs(30));

  loop {
    select! {
      recv(tick_a) -> _ => {
        rt.block_on(cloudwatch_client.put_metric_data( /* ... */ ));
      },/* similar for tick_b */
    }
  }
}

这可以编译,但是程序会因 thread 'main' panicked at 'not currently running on the Tokio runtime.' 而出现混乱。通过分析回溯,这似乎来自 Rusoto 调用

在这里错过了什么?有没有办法使这项工作?在 Rust 中是否有更好的方法来处理任务的间隔调度?

请注意,this 问题似乎没有解决我的问题。该问题从使用 futures::executor::block_on 函数开始,并通过使用由 tokio block_on 实现的 Runtime 方法解决。我已经在 block_on 中使用了 Runtime 方法

解决方法

线程“main”因“当前未在 Tokio 运行时上运行”而恐慌。

如果特定库所需的 tokio 运行时版本未主动运行,则会显示此错误 - 因为每个主要版本使用不同的线程局部变量,并且可以包含 1 个以上的库主要版本相同的构建。

在您的情况下,您可能运行的是 Tokio 0.3 运行时,但 rusoto 需要 Tokio 0.2 运行时。当 rusoto 然后尝试通过 Tokio 0.2(也包含在构建中)执行 IO 时,该程序检测到没有运行时处于活动状态并产生错误。

要解决此问题,请确保在您的项目中仅使用一个 tokio 版本。您可能需要通过 0.2 将 tokio 降级到 Cargo.toml,因为可能没有更新的 rusoto 版本可用。

另外一个不相关的建议:

除了将 crossbeam 用于计时器之外,您还可以在 tokio 运行时内运行“整个事情”:您可以使用 tokio::select!和 tokio 计时器,用于在此处使用横梁执行您的操作。

https://docs.rs/tokio/0.2.24/tokio/time/fn.interval.htmlhttps://docs.rs/tokio/0.2.24/tokio/macro.select.html(其中包含与您的用例类似的示例)

,

CloudWatchClient::put_metric_data 返回一个 RusotoFuture ,它有一个 sync 方法可以做你想要的。所以,而不是:

let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
let rt = Runtime::new().unwrap();
rt.block_on(cloudwatch_client.put_metric_data(/* ... */));

你可以这样做:

let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
cloudwatch_client.put_metric_data(/* ... */).sync();

Which is also what the official docs recommend

,

请注意:

请注意,从 v0.43.0 开始,Rusoto 使用 Rust 的 std::future::Future 和 Tokio 0.2 生态系统。从 v0.46.0 开始,Rusoto 使用 Tokio 1.0 生态系统。

对于简单的 S3 调用,我建议使用以下依赖项:

[dependencies]  
rusoto_core = "0.46.0"                            
rusoto_s3 = "0.46.0"                                                    
tokio = {version = "1.0",features = ["full"]}  

调用可以这样实现:

use rusoto_core::Region;
use rusoto_s3::{S3,S3Client};

#[tokio::main]
async fn main() {
    let region = Region::UsEast1;
    let client = S3Client::new(region);

    match client.list_buckets().await {
        Ok(output) => match output.buckets {
            Some(bucket_list) => {
                println!("Buckets in S3:");

                for bucket in bucket_list {
                    println!("{:?}",bucket.name);
                }
            }
            None => println!("No buckets!"),},Err(error) => {
            println!("Error: {:?}",error);
        }
    }
}

设置凭据以避免凭据错误。