“ tokio :: mpsc :: channel”上的接收方仅在缓冲区已满时接收消息

问题描述

在我的代码段中,tokio(v0.3)mpsc:channel接收器仅在缓冲区已满时接收到一条消息。缓冲区的大小无关紧要。

use std::io;
use std::net::{SocketAddr,ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::time::sleep;

const MESSAGE_LENGTH: usize = 1024;

pub struct Peer {
    socket: Arc<UdpSocket>,}

impl Peer {
    pub fn new<S: ToSocketAddrs>(addr: S) -> Peer {
        let socket = std::net::UdpSocket::bind(addr).expect("Could not create socket");

        let peer = Peer {
            socket: Arc::new(UdpSocket::from_std(socket).unwrap()),};

        peer.start_inbound_message_handler();
        peer
    }

    pub fn local_addr(&self) -> SocketAddr {
        self.socket.local_addr().unwrap()
    }

    fn start_inbound_message_handler(&self) {
        let socket = self.socket.clone();
        let (tx,rx) = mpsc::channel(1);

        self.start_request_handler(rx);

        tokio::spawn(async move {
            let mut buf = [0u8; MESSAGE_LENGTH];
            loop {
                if let Ok((len,addr)) = socket.recv_from(&mut buf).await {
                    println!("received {} bytes from {}",len,addr);

                    if let Err(_) = tx.send(true).await {
                        println!("error sending msg to request handler");
                    }
                }
            }
        });
    }

    fn start_request_handler(&self,mut receiver: mpsc::Receiver<bool>) {
        tokio::spawn(async move {
            while let Some(msg) = receiver.recv().await {
                println!("got ping request: {:?}",msg);
            }
        });
    }

    pub async fn send_ping(&self,dest: String) -> Result<(),io::Error> {
        let buf = [255u8; MESSAGE_LENGTH];

        self.socket.send_to(&buf[..],&dest).await?;

        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(),Box<dyn std::error::Error>> {
    let peer1 = Peer::new("0.0.0.0:0");
    println!("peer1 started on: {}",peer1.local_addr().to_string());

    let peer2 = Peer::new("0.0.0.0:0");
    println!("peer2 started on: {}",peer2.local_addr().to_string());

    peer2.send_ping(peer1.local_addr().to_string()).await?;
    peer2.send_ping(peer1.local_addr().to_string()).await?;

    sleep(Duration::from_secs(100)).await;

    Ok(())
}

链接Playground

start_inbound_message_handler函数中,我开始从套接字读取消息,如果收到一条消息,则通过mpsc::channel发出的消息将被发送到start_request_handler进行处理的地方,在此如果接收方收到任何信息,则会写入简单的日志输出

main函数中,我创建两个对等体,peer1和peer2,在创建两个对等体之后,我向第一个对等体发起ping请求。在start_inbound_message_handler中,我将从udp套接字接收数据,并通过mpsc::channel发送消息。发送返回无错误。问题如前所述,接收方仅在缓冲区已满时才接收消息。在这种情况下,缓冲区为1。因此,如果我发送第二个ping,则收到第一个ping。我不知道为什么会这样。

预期的行为是,如果我通过通道发送消息,则接收方会立即开始接收消息,而不会等到缓冲区已满。

解决方法

根据from_std()的Tokio文档:

从先前绑定的UdpSocket创建新的std::net::UdpSocket

此功能旨在用于包装来自 Tokio中的标准库。 该转换不包含任何内容 关于底层套接字;由用户自行设置 非阻止模式。

这可以与socket2的Socket接口一起使用 在移交之前配置套接字,例如设置选项 reuse_address或绑定到多个地址。

未处于非阻塞模式的套接字将阻止Tokio正常工作。

只需使用tokio函数bind(),它就更简单了。