rust - 为什么 poll() 内的延迟 future 在我的自定义流类型中不起作用?

标签 rust rust-tokio

我想每秒打印一次“Hello”。

引用文档:

Futures use a poll based model. The consumer of a future repeatedly calls the poll function. The future then attempts to complete. If the future is able to complete, it returns Async::Ready(value). If the future is unable to complete due to being blocked on an internal resource (such as a TCP socket), it returns Async::NotReady.

如果 Delay 返回的是 NotReady,我的 poll 函数将返回 NotReady,但不会将任何内容打印到标准输出。

use futures::{Async, Future, Stream}; // 0.1.25
use std::time::{Duration, Instant};
use tokio::timer::Delay; // 0.1.15

struct SomeStream;

impl Stream for SomeStream {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
        let when = Instant::now() + Duration::from_millis(1000);
        let mut task = Delay::new(when).map_err(|e| eprintln!("{:?}", e));
        match task.poll() {
            Ok(Async::Ready(value)) => {}
            Ok(Async::NotReady) => return Ok(Async::NotReady),
            Err(err) => return Err(()),
        }
        Ok(Async::Ready(Some("Hello".to_string())))
    }
}

fn main() {
    let s = SomeStream;
    let future = s
        .for_each(|item| {
            println!("{:?}", item);
            Ok(())
        })
        .map_err(|e| {});
    tokio::run(future);
}

最佳答案

这里的主要问题是缺少状态管理。每次轮询流时,您都会创建一个新的 Delay future,而不是保留它直到它得到解决。 这将导致永远看不到任何项目从流中出来,因为这些 future 仅被轮询一次,每次都可能产生 NotReady

您需要跟踪类型 SomeStream 中的延迟 future 。在这种情况下,可以使用一个选项,从而也确定我们是否需要创建一个新的延迟。

#[derive(Debug, Default)]
struct SomeStream {
    delay: Option<Delay>,
}

SomeStream::poll 的后续代码具有更好的错误处理和更惯用的结构,将变成这样:

impl Stream for SomeStream {
    type Item = String;
    type Error = Box<dyn std::error::Error + Send + Sync>; // generic error

    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
        let delay = self.delay.get_or_insert_with(|| {
            let when = Instant::now() + Duration::from_millis(1000);
            Delay::new(when)
        });

        match delay.poll() {
            Ok(Async::Ready(value)) => {
                self.delay = None;
                Ok(Async::Ready(Some("Hello".to_string())))
            },
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(err) => Err(err.into()),
        }
    }
}

或者,更好的是,使用 try_ready! 宏,这样可以用更少的样板代码返回错误和 NotReady 信号。

fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
    let delay = self.delay.get_or_insert_with(|| {
        let when = Instant::now() + Duration::from_millis(1000);
        Delay::new(when)
    });

    try_ready!(delay.poll());

    // tick!
    self.delay = None;
    Ok(Async::Ready(Some("Hello".to_string())))
}

( Playground )

关于rust - 为什么 poll() 内的延迟 future 在我的自定义流类型中不起作用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55609225/

相关文章:

rust - 在 Vec 的中间或开头高效地插入或替换多个元素?

rust - 除了在每次关闭之前克隆它之外,还有其他选择可以在多个关闭中共享 Arc 吗?

rust - 循环中的Rust所有权

docker - Actix Web 在空闲时消耗 %5 的 Cpu

rust - 如何从 tokio-proto 连接握手中检索信息?

windows - 如何编译文档中的示例?

rust - 如何处理 gtk 应用程序中的命令行参数?

rust - 从AsyncRead转发到 future 0.3 mpsc::UnboundedSender <T>时的错误处理

rust - 如何停止超 HTTP Web 服务器并返回错误?

rust - 为什么我得到 FromIterator<&T>` is not Implemented for Vec<T>?