我如何创建一个 Tokio 计时器来消除网络数据包的接收? 问题我试图解决去抖动问题

问题描述

问题

我已经实现了一个函数,该函数从发送的数据包中返回一组缺失的索引。如果发送了 100 个数据包,那么服务器将有一个索引向量,其中包含 0(缺失)和 1(不缺失)。我不想每次都触发这个,只有在没有收到数据包的情况下有轻微的延迟。我想将我的同步函数更改为 asynchronous debouncing function

我试图解决去抖动问题

我正在寻找一种解决方案来实现一个计时器(如 300 毫秒),该计时器的值将不断被不同的线程覆盖。一旦它的值不再被覆盖,它应该触发一个代码块或函数块。我正在使用 Tokio。

这是我想要实现的伪代码:

// thanks https://stackoverflow.com/questions/26593387/how-can-i-get-the-current-time-in-milliseconds
fn get_epoch() -> u128 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_millis()
}

impl Server {
    async fn run(self) -> Result<(),io::Error> {
        let Server {
            socket,mut buf,mut to_send,} = self;

        let mut timer_delay = get_epoch();

        loop {
            if let Some((size,peer)) = to_send {
                timer_delay = get_epoch(); // "reset" the to a closer value
            }

            futures::join!(
                /* execute a block of code if true*/
                if get_epoch() - timer_delay > 300,/* else (default case):*/
                to_send = Some(socket.recv_from(&mut buf)
            );
        }
    }
}

我的项目基于来自 Tokio 的以下示例:

impl Server {
    async fn run(self) -> Result<(),} = self;

        loop {
            // First we check to see if there's a message we need to echo back.
            // If so then we try to send it back to the original source,waiting
            // until it's writable and we're able to do so.
            if let Some((size,peer)) = to_send {
                let amt = socket.send_to(&buf[..size],&peer).await?;

                println!("Echoed {}/{} bytes to {}",amt,size,peer);
            }

            // If we're here then `to_send` is `None`,so we take a look for the
            // next message we're going to echo back.
            to_send = Some(socket.recv_from(&mut buf).await?);
        }
    }
}

解决方法

生成另一个 Tokio 任务用于去抖,它将收听频道。您可以通过使用超时来判断通道何时没有收到任何内容。当超时发生时,这是您应该执行不频繁操作的信号。不要忘记在通道关闭时执行该操作:

use std::time::Duration;
use tokio::{sync::mpsc,task,time}; // 1.3.0

#[tokio::main]
async fn main() {
    let (debounce_tx,mut debounce_rx) = mpsc::channel(10);
    let (network_tx,mut network_rx) = mpsc::channel(10);

    // Listen for events
    let debouncer = task::spawn(async move {
        let duration = Duration::from_millis(10);

        loop {
            match time::timeout(duration,debounce_rx.recv()).await {
                Ok(Some(())) => {
                    eprintln!("Network activity")
                }
                Ok(None) => {
                    eprintln!("Debounce finished");
                    break;
                }
                Err(_) => {
                    eprintln!("{:?} since network activity",duration)
                }
            }
        }
    });

    // Listen for network activity
    let server = task::spawn({
        let debounce_tx = debounce_tx.clone();
        async move {
            while let Some(packet) = network_rx.recv().await {
                // Received a packet
                debounce_tx
                    .send(())
                    .await
                    .expect("Unable to talk to debounce");
                eprintln!("Received a packet: {:?}",packet);
            }
        }
    });

    // Prevent deadlocks
    drop(debounce_tx);

    // Drive the network input
    network_tx.send(1).await.expect("Unable to talk to network");
    network_tx.send(2).await.expect("Unable to talk to network");
    network_tx.send(3).await.expect("Unable to talk to network");

    time::sleep(Duration::from_millis(20)).await;

    network_tx.send(4).await.expect("Unable to talk to network");
    network_tx.send(5).await.expect("Unable to talk to network");
    network_tx.send(6).await.expect("Unable to talk to network");

    time::sleep(Duration::from_millis(20)).await;

    // Close the network
    drop(network_tx);

    // Wait for everything to finish
    server.await.expect("Server panicked");
    debouncer.await.expect("Debouncer panicked");
}
Received a packet: 1
Received a packet: 2
Received a packet: 3
Network activity
Network activity
Network activity
10ms since network activity
10ms since network activity
Received a packet: 4
Received a packet: 5
Received a packet: 6
Network activity
Network activity
Network activity
10ms since network activity
Debounce finished

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...