当任何任务正在运行时,如何阻止异步程序终止?

问题描述

假设我有这样的事情,

async fn do_update() {
    // here we store it.
    let task = task::spawn(async {
        let duration = Duration::from_millis(10);
        let mut stream = tokio::time::interval(duration);
        stream.tick().await;
        loop {
            println!("Foo");
            stream.tick().await;
        }
    });
    // and here we await it.
    task.await;
}

如果我调用它,就像 do_update().await 循环永远运行。这就是我想要的。但我不希望它成为唯一运行的东西。我只是想让它阻止终止,直到任务解决。

我希望事件每 5 秒运行一次,而不会阻塞程序的其余部分。如果我把它放在程序的末尾,这完全如我所愿。似乎这曾经是用 shutdown_on_idle 完成的,但由于同样的原因,现在提供的解决方案对我无效as this comment

我无法那样实现它,因为我是在另一个任务中生成的。

在任务产生任务的系统中,我如何确保只有在没有更多任务运行时才关闭可执行文件?

解决方法

我认为您正在寻找结构化并发,特别是全局隐式作用域。遗憾的是,nobody could find a good solution,因此暂时放弃了这项工作。

与此同时,这是一种可能的解决方法。对于应该使程序保持活动状态的任何任务,调用 spawn_keep_alive 而不是 tokio::spawn

Playground

use parking_lot::Mutex;
use std::{
    future::Future,sync::atomic::{AtomicU32,Ordering},time::Duration,};
use tokio::{sync::oneshot,task::JoinHandle};

static KEEPALIVE_COUNT: AtomicU32 = AtomicU32::new(0);
static KEEPALIVE_SENDER: Mutex<Option<oneshot::Sender<()>>> = parking_lot::const_mutex(None);

pub fn spawn_keep_alive<T>(task: T) -> JoinHandle<T::Output>
where
    T: Future + Send + 'static,T::Output: Send + 'static,{
    KEEPALIVE_COUNT.fetch_add(1,Ordering::Relaxed);
    tokio::spawn(async {
        let result = task.await;
        if KEEPALIVE_COUNT.fetch_sub(1,Ordering::Relaxed) == 1 {
            let sender = KEEPALIVE_SENDER.try_lock().unwrap().take().unwrap();
            sender.send(()).unwrap();
        }
        result
    })
}

async fn do_update() {
    let mut stream = tokio::time::interval(Duration::from_millis(100));
    stream.tick().await;
    for _ in 0..10 {
        println!("Foo");
        stream.tick().await;
    }
    spawn_keep_alive(async {
        tokio::time::sleep(Duration::from_millis(1000)).await;
        println!("I'm aliveeee!");
    });
    spawn_keep_alive(async {
        tokio::time::sleep(Duration::from_millis(2000)).await;
        println!("Don't forget about me!");
    });
}

#[tokio::main]
async fn main() {
    let (send,recv) = oneshot::channel();
    *KEEPALIVE_SENDER.try_lock().unwrap() = Some(send);
    spawn_keep_alive(do_update());
    // Wait for all keep-alive tasks to finish
    recv.await.unwrap();
}

KEEPALIVE_SENDER 可能更有效,但它只使用了两次)。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...