为什么在crossbeam_channel :: select旁边调用tokio :: spawn会有延迟?

问题描述

我正在创建一个任务,它将产生其他任务。其中一些会花费一些时间,因此无法等待,但可以并行运行:

src / main.rs

<a href="@System.Uri.UnescapeDataString(Url.RouteUrl("Product",new {SeName = Model.SeName}))">@Model.Name</a>

我注意到了奇怪的行为。输出

use crossbeam::crossbeam_channel::{bounded,select};

#[tokio::main]
async fn main() {
    let (s,r) = bounded::<usize>(1);

    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            let loop_id = counter.clone();
            tokio::spawn(async move { // why this one was not fired?
                println!("inner task {}",loop_id);
            }); // .await.unwrap(); - solves issue,but this is long task which cannot be awaited
            println!("loop {}",loop_id);
            select! {
                recv(r) -> rr => {
                    // match rr {
                    //     Ok(ee) => {
                    //         println!("received from channel {}",loop_id);
                    //         tokio::spawn(async move {
                    //             println!("received from channel task {}",loop_id);
                    //         });
                    //     },//     Err(e) => println!("{}",e),// };
                },// more recv(some_channel) -> 
            }
            counter = counter + 1;
        }
    });

    // let s_clone = s.clone();
    // tokio::spawn(async move {
    //     s_clone.send(2).unwrap();
    // });

    loop {
        // rest of the program
    }
}

我希望它也能输出loop 0

如果我向channel发送一个值,输出将是:

inner task 0

缺少loop 0 inner task 0 loop 1

为什么inner task 1产生一个延迟循环?

我第一次注意到“从通道任务接收到”这种行为会延迟一个循环,但是当我减少代码以准备示例时,这种情况就开始发生在“内部任务”中。值得一提的是,如果我将第二个inner task写给另一个,则只有最后一个会出现此问题。调用tokio::spawntokio::spawn时应该注意些什么?是什么原因导致这一循环延迟?

Cargo.toml依赖项

select!

Rust 1.46,Windows 10

解决方法

select!被禁止,并且tokio::spawn say的文档:

生成的任务可能在当前线程上执行,或者可能被发送到其他线程以执行。

在这种情况下,select!“未来”实际上是一个阻塞函数,并且spawn不使用新线程(无论是在第一次调用还是在循环内的线程)。 因为您没有告诉tokio您将要阻塞,所以tokio认为不需要另一个线程(从tokio的角度来看,您只有3个永远不会阻塞的期货,那么为什么您仍然需要另一个线程?)。 / p>

解决方案是将tokio::task::spawn_blocking用于select!闭包(它将不再是将来,因此async move {}现在是move || {})。 现在,tokio将知道此函数实际上已阻塞,并将其移至另一个线程(同时将所有实际的期货保留在其他执行线程中)。

use crossbeam::crossbeam_channel::{bounded,select};

#[tokio::main]
async fn main() {
    let (s,r) = bounded::<usize>(1);

    tokio::task::spawn_blocking(move || {
        // ...
    });

    loop {
        // rest of the program
    }
}

Link to playground

另一种可能的解决方案是使用非阻塞通道,例如tokio::sync::mpsc,您可以在其上使用await并获得预期的行为,例如直接使用{{1}的playground example }或recv().await,例如:

tokio::select!

Link to playground