如何在像 Vec 这样的容器中管理 tokio oneshot::channel?

问题描述

我想使用容器来管理 tokio::oneshot::Sender。我正在使用 Vec,但似乎 Vec 中保存的值是引用,我需要使用 self 而不是引用来调用它:

use bytes::BytesMut;
use tokio::sync::oneshot;

#[derive(Clone)]
pub enum ChannelData {
    Video { timestamp: u32,data: BytesMut },Audio { timestamp: u32,MetaData {},}

pub type PlayerPublisher = oneshot::Sender<ChannelData>;

pub struct Channel {
    player_producers: Vec<PlayerPublisher>,// consumers who subscribe this channel.
}

impl Channel {
    fn new() -> Self {
        Self {
            player_producers: Vec::new(),}
    }

    async fn transmit(&mut self) {
        let b = BytesMut::new();
        let data = ChannelData::Video {
            timestamp: 234,data: b,};

        for i in self.player_producers {
            i.send(data);
        }
    }
}

错误:

error[E0507]: cannot move out of `self.player_producers` which is behind a mutable reference
  --> src/lib.rs:31:18
   |
31 |         for i in self.player_producers {
   |                  ^^^^^^^^^^^^^^^^^^^^^
   |                  |
   |                  move occurs because `self.player_producers` has type `Vec<tokio::sync::oneshot::Sender<ChannelData>>`,which does not implement the `Copy` trait
   |                  help: consider iterating over a slice of the `Vec<_>`'s content: `&self.player_producers`

error[E0382]: use of moved value: `data`
  --> src/lib.rs:32:20
   |
26 |         let data = ChannelData::Video {
   |             ---- move occurs because `data` has type `ChannelData`,which does not implement the `Copy` trait
...
32 |             i.send(data);
   |                    ^^^^ value moved here,in previous iteration of loop

我怎样才能实现我的目标?

pub fn send(mut self,t: T) -> Result<(),T> {
    let inner = self.inner.take().unwrap();

    inner.value.with_mut(|ptr| unsafe {
        *ptr = Some(t);
    });

    if !inner.complete() {
        unsafe {
            return Err(inner.consume_value().unwrap());
        }
    }

    Ok(())
}

解决方法

调用 send 需要拥有 oneshot 频道。要获得该所有权,您可以取得容器的所有权。在这种情况下,最简单的方法是取得 Channel 的所有权:

async fn transmit(self) { // Changed to `self`
    for i in self.player_producers {
        let data = ChannelData::Video {
            timestamp: 234,data: BytesMut::new(),};
        if i.send(data).is_err() {
            panic!("Unable to send data");
        }
    }
}

其他选项是drain集合:

for i in self.player_producers.drain(..) {

swap the collection with an empty one

use std::mem;
for i in mem::take(&mut self.player_producers) {

在每种情况下,每次发送时都必须构造(或克隆)data 负载。

另见:

,

Tokio oneshot 发送者只能发送一条消息,因此 send 将消耗 Sender。要从 send 中调用 Vec,您首先必须删除它。在迭代它们时从 &mut Vec 中删除所有元素的方法是 drain 它:

for i in self.player_producers.drain(..) {
    i.send(data);
}

您的另一个错误来自 datasend 的调用所消耗。由于您想将相同的数据发送给多个发件人,因此您必须clone

i.send(data.clone());

请注意 send 返回 Result 的警告。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...