问题描述
我正在尝试在Rust中实现Rabbitmq发送/监听功能,并且我有以下代码:
struct RabbitMQ {
connection: Connection,}
impl RabbitMQ {
fn connect() -> Self {
RabbitMQ {
connection: the created connection
}
}
}
impl Messagebroker for RabbitMQ {
async fn publish(&self,topic: &Topic) -> Result<PublisherConfirm,Error> {
let channel = self.connection.create_channel().await.unwrap();
RabbitMQ::create_exchange(&channel,&topic.exchange).await;
let payload = topic.message.as_bytes();
let res = channel.basic_publish(
topic.exchange.name.as_str(),topic.queue.routing_key.as_str(),topic.exchange.publish_options,payload.to_vec(),BasicProperties::default(),);
res.await
}
}
到目前为止一切顺利!
现在,我想在for循环中发布许多消息,而不必等待服务器的确认,问题是当我生成tokio异步任务时,我需要移动我的代理值,这对于下一次迭代而言使它无效循环:
let broker = RabbitMQ::connect(&connection_details).await;
for x in 1..10 {
tokio::spawn(async move {
let confirm = broker.publish(&my_topic).await.unwrap();
}).await.unwrap();
}
error[E0382]: use of moved value: `broker`
--> src/main.rs:47:33
|
21 | let broker = RabbitMQ::connect(&connection_details).await;
| ------ move occurs because `broker` has type `message_broker::RabbitMQ`,which >does not implement the `copy` trait
...
47 | tokio::spawn(async move {
| _________________________________^
48 | | let confirm = &broker.publish(&enable_cdn).await.unwrap();
| | ------ use occurs due to use in generator
49 | | }).await.unwrap();
| |_________^ value moved here,in prevIoUs iteration of loop
我不能实现 copy 特性,因为 Connection 不是原始的,并且似乎不能对代理使用引用“&”。
我的问题是,如何在不编写 n 个发布调用的情况下完成此任务?
解决方法
您正在使用async move
块,这意味着该块中使用的任何 name 都将移至将来,而不考虑所执行的操作。所以写
&broker.publish
块内的没什么区别:首先移动broker
,而将来(当用.await
进行轮询时)对其进行内部引用。因此,您需要做的是在块外借钱,然后将其移动到里面:
let broker = RabbitMQ::connect(&connection_details).await;
for x in 1..10 {
let broker = &broker;
tokio::spawn(async move {
let confirm = broker.publish(&enable_cdn).await.unwrap();
}).await.unwrap();
}
但是我认为也不起作用:tokio::spawn
没有范围,所以即使您在等待它,编译器也不知道它不会过期broker
。就其而言,tokio任务可以长期存在。这意味着您现在可能会遇到生命周期错误(编译器将假定借用可以超出封闭函数,并因此失去其起源)。
一个简单的解决方案是将Connection放在Arc
之类的后面。
或者,重组系统以更好地满足Rabbitmq的要求:不知道您使用的是什么,但是amiquip指出连接是线程安全的,而 channels 却不是线程安全的发送到其他线程。
因此,不是发布到隐式连接,而是在循环的每次迭代中创建一个通道,然后将那个移到任务中,以便实际执行发布。
还
现在我想在for循环中发布许多消息,而不必等待服务器的确认
因为您正在等待tokio :: spawn的结果,所以您仍然不这样做吗?