问题描述
我正在尝试使用StreamExt中的扫描方法。如果我没有异步块,它会完美地工作。
use futures::{stream,StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0,|s,i| {
*s += i;
futures::future::ready(Some(*s))
})
.for_each(|x| async move {
println!("{:?}",x);
})
.await;
}
但是,如果我确实有async
块,则它不会编译。
use futures::{stream,i| async move {
*s += i;
Some(*s)
})
.for_each(|x| async move {
println!("{:?}",x);
})
.await;
}
错误是:
@H_502_16@error: borrowed data cannot be stored outside of its closure
--> src/main.rs:6:36
|
5 | / stream::iter(1..10)
6 | | .scan(0,i| async move {
| |__________________------____________^
| || |
| || ...because it cannot outlive this closure
7 | || *s += i;
8 | || Some(*s)
9 | || })
| ||_________^ cannot be stored outside of its closure
... |
12 | | })
13 | | .await;
| |______________- borrowed data cannot be stored into here...
如何解决此问题并在async
块内更改状态?
解决方法
您不能跨async
个边界共享参考。一旦代码在async
上下文中执行,就无法再跟踪生存期,因为编译器不知道将来什么时候完成。
一种解决方案是使用引用计数的智能指针和内部可变性:
use futures::{stream,StreamExt};
use std::cell::Cell;
use std::rc::Rc;
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Rc::new(Cell::new(0)),|s,i| {
let s = s.clone();
async move {
s.set(s.get() + i);
Some(s.get())
}
})
.for_each(|x| async move {
println!("{:?}",x);
})
.await;
}
闭包参数中的s
是&mut Rc<Cell<i32>>
,它不能跨async
边界移动。但是克隆的版本是Rc<Cell<i32>>
,由于没有生命周期需要跟踪,因此可以将它们移动到那里。
use futures::{stream,StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0,i| { // s: &mut i32,i: i32
*s += i;
let st = *s; // Dereference `s`,and move the value of `s` to `st`,but the type of
// `*s` is i32 which impl Copy trait,so here Copy `*s` value to
// the owned `st`.
//
// Note: this works only if `s` impl the Copy trait
async move { // move the owned i32 st into the async block (copy again)
// and also move i into the async block which is copyed too
Some(st)
}
})
.for_each(|x| {
println!("{:?}",x);
async {}
})
.await;
}
如果s
不具有Copy
特质,则必须手动复制/克隆:
use futures::{stream,StreamExt};
#[derive(Debug,Clone)]
struct TT {
s: i32,}
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(TT{s: 0},i| {
s.s += i;
// let mut st = *s; // move occurs because `*s` has type `TT`,which does not implement the `Copy` trait
let st = s.clone(); // clone/copy manually
async move {
Some(st)
}
})
.for_each(|x| {
println!("{:?}",x);
async {}
})
.await;
}
如果要在异步块中更改s
,则必须通过智能指针(例如Rc
或Arc
)来拥有它。并且还需要一种方法来修改由智能指针包装的值,例如Mutex
,RwLock
或Cell
:
use futures::{stream,StreamExt,lock::Mutex};
use std::sync::Arc; // Arc or Rc
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Arc::new(Mutex::new(0)),i| { // s: &mut Arc<Mutex<i32>>,i: i32
let s = Arc::clone(s); // Clone to owned the Arc
async move { // move the owned `s` into the async block
// and also move i into the async block which is copyed too
let mut guard = s.lock().await;
*guard += i;
Some(*guard)
}
})
.for_each(|x| {
println!("{:?}",x);
async {}
})
.await;
}