我正在尝试使用 StreamExt 中的扫描方法。如果我没有异步 block ,它会完美地工作。
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| {
*s += i;
futures::future::ready(Some(*s))
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
但是如果我确实有 async
block ,它就无法编译。
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| async move {
*s += i;
Some(*s)
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
错误是:
error: borrowed data cannot be stored outside of its closure
--> src/main.rs:6:36
|
5 | / stream::iter(1..10)
6 | | .scan(0, |s, i| async move {
| |__________________------____________^
| || |
| || ...because it cannot outlive this closure
7 | || *s += i;
8 | || Some(*s)
9 | || })
| ||_________^ cannot be stored outside of its closure
... |
12 | | })
13 | | .await;
| |______________- borrowed data cannot be stored into here...
如何解决此问题并改变 async
block 内的状态?
最佳答案
您无法跨 async
共享引用边界。一旦代码在 async
中执行,在上下文中,无法再跟踪生命周期,因为编译器不知道 future 何时完成。
一种解决方案是使用引用计数智能指针和内部可变性:
use futures::{stream, StreamExt};
use std::cell::Cell;
use std::rc::Rc;
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Rc::new(Cell::new(0)), |s, i| {
let s = s.clone();
async move {
s.set(s.get() + i);
Some(s.get())
}
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
s
闭包参数中是 &mut Rc<Cell<i32>>
不能跨 async
移动边界。但克隆版本是Rc<Cell<i32>>
可以移动到那里,因为没有生命周期可以跟踪。
关于rust - 如何使用 Rust 中的 StreamExt::scan 方法改变异步 block 内的状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64044531/