rust - 为什么使用异步 block 然后使我的流取消固定?

标签 rust

我是 Rust 的新手,如果我使用的术语不正确,我很抱歉。也许我对问题的用词选择不正确。
我正在玩流,我需要在流元素之间有一些延迟。所以我写了这个:

use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(0..1000).then(|x| async move {
        time::delay_for(std::time::Duration::from_millis(500)).await;
        x + 1
    });
    while let Some(x) = stream.next().await {
        println!("{:?}", x)
    }
}
我收到很多编译错误,但最重要的错误与固定有关。他们来了:
error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>` cannot be unpinned
  --> src/main.rs:11:32
   |
11 |     while let Some(x) = stream.next().await {
   |                                ^^^^ within `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>`
   |
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
   = note: required because it appears within the type `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
   = note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`

error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>` cannot be unpinned
  --> src/main.rs:11:25
   |
11 |     while let Some(x) = stream.next().await {
   |                         ^^^^^^^^^^^^^^^^^^^ within `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>`
   |
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
   = note: required because it appears within the type `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
   = note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
   = note: required because of the requirements on the impl of `core::future::future::Future` for `futures_util::stream::stream::next::Next<'_, futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>>`
如果我将代码更改为:
use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(0..1000).then(|x|  {
        futures::future::ready(x + 1)
    });
    while let Some(x) = stream.next().await {
        println!("{:?}", x)
    }
}
或者到这个:
use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
    stream::iter(0..1000)
        .then(|x| async move {
            time::delay_for(std::time::Duration::from_millis(500)).await;
            x + 1
        })
        .for_each(|x| async move { println!("{:?}", x) })
        .await;
}
它编译。
我认为这与 then 的固定和同时使用有关组合器和 while ,但我无法绕过它。

最佳答案

我认为问题归结为async块不是 Unpin .可以用这段代码证明:

fn check_unpin<F: Unpin>(_: F) { }

fn main() {
    check_unpin(async {});
}
以相当神秘的消息失败:
error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:4:23: 4:25 _]>` cannot be unpinned
  --> src/main.rs:4:5
   |
1  | fn check_unpin<F: Unpin>(_: F) { }
   |                   ----- required by this bound in `check_unpin`
...
4  |     check_unpin(async {});
   |     ^^^^^^^^^^^ within `impl std::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:4:23: 4:25 _]>`
   |
   = note: required because it appears within the type `impl std::future::Future`
我认为 GenFuture是转换 async 的内部类型块成impl Future .
现在回到您的问题,组合子 then()返回 Unpin值如果流和 Future关闭返回的是 Unpin (在 documentation 中并不完全清楚,但我从源代码中推断出这一点)。 stream::iter is Unpin ,但是当你写 |x| async move { x + 1}您正在返回 async不是 Unpin 的块,因此你的错误。
如果您使用 futures::future::ready(x + 1)它的工作原理很简单,因为Future implements Unpin .
如果您使用 StreamExt::for_each它有效,因为它不需要 Self成为 Unpin .不是Unpin本身,但没关系,因为您将其发送至 tokio::main在轮询之前将所有内容固定在内部。
如果你想让你的原始代码工作,你只需要手动固定你的流( playground ):
use futures::stream;
use futures::StreamExt;
use tokio::time;
use pin_utils::pin_mut;

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..1000).then(|x| async move {
        time::delay_for(std::time::Duration::from_millis(500)).await;
        x + 1
    });
    pin_mut!(stream); //<---- here, pinned!
    while let Some(x) = stream.next().await {
        println!("{:?}", x)
    }
}

关于rust - 为什么使用异步 block 然后使我的流取消固定?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64005557/

相关文章:

rust - 将结构转换为数组是否合法?

generics - 类型参数: expected 1 but found 0的数量错误

Rust 宏错误 : local ambiguity: multiple parsing options

rust - Kcov 报告 100% 的 Rust 库,即使一些方法没有被覆盖

rust - Rust 是否支持具有运行时确定值的常量泛型类型?

rust - 在Bevy Engine中,如何在for-each系统中使用&mut查询?

algorithm - 有没有更好的方法来使用图像 0.18 crate 实现中点圆算法?

data-structures - 如何在安全的Rust中遍历相互递归的图?

rust - 如何匹配 `&(&usize, &u32)` 之类的模式?

rust - 从 Cranelift 发出 ASM