在编译时等待一些未知的期货

问题描述

我想利用 Tokio 的运行时来处理可变数量的异步期货。由于期货的数量在编译时未知,似乎 FuturesUnordered 是我最好的选择(宏 @@ 需要在编译时指定您的分支;join_all 可能是可能的,但文档当顺序无关紧要时,建议“在很多情况下”使用 FuturesUnordered)。

这个片段的逻辑是一个 recv() 循环被推送到期货桶,它应该总是运行。当新数据到达时,它的解析/处理也会被推送到期货桶(而不是立即处理)。这可确保接收器在响应新事件时保持低延迟,并且数据处理(可能计算成本高的解密)与所有其他数据处理异步块(加上侦听接收器)同时进行。

This thread 顺便解释了为什么期货会得到 select!

问题在于这个神秘的错误:

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
  --> src/main.rs:27:8
   |
27 |     }).boxed());
   |        ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Box<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>`
   = note: required because of the requirements on the impl of `Sync` for `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `&FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because it appears within the type `[static generator@src/main.rs:16:25: 27:6 _]`
   = note: required because it appears within the type `from_generator::GenFuture<[static generator@src/main.rs:16:25: 27:6 _]>`
   = note: required because it appears within the type `impl futures::Future`

看起来像“递归地”推送到 UnorderedFutures(我猜不是真的,但你还能叫它什么?)不起作用,但我不知道为什么。此错误表明 .boxed() 趋向于 Box'd & Pin'd 异步块不满足某些 Sync 特征要求——我猜这个要求只是因为 {{1} }(在 FuturesUnordered 期间使用,因为该方法借用了 &self)需要它作为其 &FuturesUnordered 特性...或其他什么?

futures.push(...)

解决方法

我将把低级错误留给另一个答案,但我相信解决高级问题的更惯用的方法是将 FuturesUnordered 的使用与 tokio::select! 之类的东西结合使用如下:

use tokio::sync::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;

#[tokio::main]
pub async fn main() {
    let mut futures = FuturesUnordered::new();
    let (tx,mut rx) = mpsc::channel(32);
    
    //turn foo into something more concrete
    tokio::spawn(async move {
        let _ = tx.send(42i32).await;
    });

    loop {
        tokio::select! {
            Some(data) = rx.recv() => {
                futures.push(async move {
                    data.to_string()
                });
            },Some(result) = futures.next() => {
                println!("{}",result)
            },else => break,}
    }
}

您可以在此处阅读有关 select 宏的更多信息:https://tokio.rs/tokio/tutorial/select

,

当您使用 boxed 方法装箱由异步块创建的未来时,您试图将其强制为 dyn Future + Send

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

然而,创造的未来不是Send。为什么?因为在它里面,你尝试推送到 FuturesUnordered,它借用它:

pub fn push(&self,future: Fut)

这意味着 async 块捕获一个 &FuturesUnordered。要使类型为 Send,它的所有字段都必须为 Send,因此要生成的未来为 Send&FuturesUnordered 必须为 Send。>

要使引用为 Send,类型也必须为 Sync

impl<'_,T> Send for &'_ T where
    T: Sync

要使 FuturesUnorderedSync,存储的期货也必须为 Sync

impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}

然而,boxed 返回的未来不一定是 Sync

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

这意味着异步生成器不是 Send,因此您无法将其强制为 dyn Future + Send,并且会收到令人困惑的错误消息。

解决办法是添加一个绑定到未来的Sync,并手动添加Box::pin

type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;

let mut futures = FuturesUnordered::<BoxedFuture>::new();

futures.push(Box::pin(async {
    loop {
        match rx.recv().await {
            Some(data) => {
                futures.push(Box::pin(async move {
                    let _ = data;
                }));
            }
            None => {}
        }
    }
}));

但是,您随后会遇到一堆借贷问题。更好的解决方案是使用 tokio::select! 而不是外部 push,正如迈克尔的回答所解释的那样。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...