循环中的Rust所有权

问题描述

我正在尝试在Rust中实现Rabbitmq发送/监听功能,并且我有以下代码

struct RabbitMQ {
    connection: Connection,}

impl RabbitMQ {
    fn connect() -> Self {
        RabbitMQ {
           connection: the created connection
        }
    }
}

impl Messagebroker for RabbitMQ {
    async fn publish(&self,topic: &Topic) -> Result<PublisherConfirm,Error> {
        let channel = self.connection.create_channel().await.unwrap();

        RabbitMQ::create_exchange(&channel,&topic.exchange).await;

        let payload = topic.message.as_bytes();

        let res = channel.basic_publish(
            topic.exchange.name.as_str(),topic.queue.routing_key.as_str(),topic.exchange.publish_options,payload.to_vec(),BasicProperties::default(),);

        res.await
    }
}

到目前为止一切顺利!

现在,我想在for循环中发布许多消息,而不必等待服务器的确认,问题是当我生成tokio异步任务时,我需要移动我的代理值,这对于下一次迭代而言使它无效循环:

let broker = RabbitMQ::connect(&connection_details).await;

for x in 1..10 {
    tokio::spawn(async move {
        let confirm = broker.publish(&my_topic).await.unwrap();
    }).await.unwrap();
}

上面的代码不会编译并出现以下错误

 error[E0382]: use of moved value: `broker`
  --> src/main.rs:47:33
   |
21 |       let broker = RabbitMQ::connect(&connection_details).await;
   |           ------ move occurs because `broker` has type `message_broker::RabbitMQ`,which >does not implement the `copy` trait
...
47 |           tokio::spawn(async move {
   |  _________________________________^
48 | |             let confirm = &broker.publish(&enable_cdn).await.unwrap();
   | |                            ------ use occurs due to use in generator
49 | |         }).await.unwrap();
   | |_________^ value moved here,in prevIoUs iteration of loop

我不能实现 copy 特性,因为 Connection 不是原始的,并且似乎不能对代理使用引用“&”。

我的问题是,如何在不编写 n 个发布调用的情况下完成此任务?

解决方法

您正在使用async move块,这意味着该块中使用的任何 name 都将移至将来,而不考虑所执行的操作。所以写

&broker.publish
块内的

没什么区别:首先移动broker,而将来(当用.await进行轮询时)对其进行内部引用。因此,您需要做的是在块外借钱,然后将其移动到里面:

let broker = RabbitMQ::connect(&connection_details).await;

for x in 1..10 {
    let broker = &broker;
    tokio::spawn(async move {
        let confirm = broker.publish(&enable_cdn).await.unwrap();
    }).await.unwrap();
}

但是我认为也不起作用:tokio::spawn没有范围,所以即使您在等待它,编译器也不知道它不会过期broker。就其而言,tokio任务可以长期存在。这意味着您现在可能会遇到生命周期错误(编译器将假定借用可以超出封闭函数,并因此失去其起源)。

一个简单的解决方案是将Connection放在Arc之类的后面。

或者,重组系统以更好地满足Rabbitmq的要求:不知道您使用的是什么,但是amiquip指出连接是线程安全的,而 channels 却不是线程安全的发送到其他线程。

因此,不是发布到隐式连接,而是在循环的每次迭代中创建一个通道,然后将那个移到任务中,以便实际执行发布。

现在我想在for循环中发布许多消息,而不必等待服务器的确认

因为您正在等待tokio :: spawn的结果,所以您仍然不这样做吗?