如何在Rust中使用StreamExt :: scan方法更改异步块内的状态?

问题描述

我正在尝试使用StreamExt中的扫描方法。如果我没有异步块,它会完美地工作。

use futures::{stream,StreamExt};

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(0,|s,i| {
            *s += i;
            futures::future::ready(Some(*s))
        })
        .for_each(|x| async move {
            println!("{:?}",x);
        })
        .await;
}

但是,如果我确实有async块,则它不会编译。

use futures::{stream,i| async move {
            *s += i;
            Some(*s)
        })
        .for_each(|x| async move {
            println!("{:?}",x);
        })
        .await;
}

错误是:

@H_502_16@error: borrowed data cannot be stored outside of its closure --> src/main.rs:6:36 | 5 | / stream::iter(1..10) 6 | | .scan(0,i| async move { | |__________________------____________^ | || | | || ...because it cannot outlive this closure 7 | || *s += i; 8 | || Some(*s) 9 | || }) | ||_________^ cannot be stored outside of its closure ... | 12 | | }) 13 | | .await; | |______________- borrowed data cannot be stored into here...

如何解决此问题并在async块内更改状态?

解决方法

您不能跨async个边界共享参考。一旦代码在async上下文中执行,就无法再跟踪生存期,因为编译器不知道将来什么时候完成。

一种解决方案是使用引用计数的智能指针和内部可变性:

use futures::{stream,StreamExt};
use std::cell::Cell;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(Rc::new(Cell::new(0)),|s,i| {
            let s = s.clone();
            async move {
                s.set(s.get() + i);
                Some(s.get())
            }
        })
        .for_each(|x| async move {
            println!("{:?}",x);
        })
        .await;
}

闭包参数中的s&mut Rc<Cell<i32>>,它不能跨async边界移动。但是克隆的版本是Rc<Cell<i32>>,由于没有生命周期需要跟踪,因此可以将它们移动到那里。

,
use futures::{stream,StreamExt};

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(0,i| {       // s: &mut i32,i: i32
            *s += i;
            let st = *s;        // Dereference `s`,and move the value of `s` to `st`,but the type of
                                // `*s` is i32 which impl Copy trait,so here Copy `*s` value to
                                // the owned `st`.
                                // 
                                // Note: this works only if `s` impl the Copy trait

            async move {        // move the owned i32 st into the async block (copy again)
                                // and also move i into the async block which is copyed too
                Some(st)
            }
        })
        .for_each(|x| {
            println!("{:?}",x);
            async {}
        })
        .await;
}

如果s不具有Copy特质,则必须手动复制/克隆:

use futures::{stream,StreamExt};

#[derive(Debug,Clone)]
struct TT {
    s: i32,}

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(TT{s: 0},i|  {
            s.s += i;
            // let mut st = *s;      // move occurs because `*s` has type `TT`,which does not implement the `Copy` trait
            let st = s.clone();      // clone/copy manually
            async move {
                Some(st)
            }
        })
        .for_each(|x| {
            println!("{:?}",x);
            async {}
        })
        .await;
}

如果要在异步块中更改s,则必须通过智能指针(例如RcArc)来拥有它。并且还需要一种方法来修改由智能指针包装的值,例如MutexRwLockCell

use futures::{stream,StreamExt,lock::Mutex};
use std::sync::Arc;  // Arc or Rc

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(Arc::new(Mutex::new(0)),i| { // s: &mut Arc<Mutex<i32>>,i: i32
            let s = Arc::clone(s);              // Clone to owned the Arc

            async move {        // move the owned `s` into the async block
                                // and also move i into the async block which is copyed too
                let mut guard = s.lock().await;
                *guard += i;
                Some(*guard)
            }
        })
        .for_each(|x| {
            println!("{:?}",x);
            async {}
        })
        .await;
}