rust - AsyncRead中Timeout不会超时

标签 rust rust-async-std

我正在尝试实现一个异步读取包装器,它将添加读取超时功能。目标是 API 简单 AsyncRead 。换句话说,我不想在代码中到处添加 io.read(buf).timeout(t) 。相反,读取实例本身应该返回适​​当的 io::ErrorKind::TimedOut在给定的超时到期后。

我无法轮询delay不过准备好了。它始终处于待定状态。我尝试过 async-std , futures , smol-timeout - 相同的结果。虽然等待时会触发超时,但轮询时不会触发。我知道超时并不容易。需要有什么东西来唤醒它。我究竟做错了什么?如何解决这个问题?

use async_std::{
    future::Future,
    io,
    pin::Pin,
    task::{sleep, Context, Poll},
};
use std::time::Duration;

pub struct PrudentIo<IO> {
    expired: Option<Pin<Box<dyn Future<Output = ()> + Sync + Send>>>,
    timeout: Duration,
    io: IO,
}

impl<IO> PrudentIo<IO> {
    pub fn new(timeout: Duration, io: IO) -> Self {
        PrudentIo {
            expired: None,
            timeout,
            io,
        }
    }
}

fn delay(t: Duration) -> Option<Pin<Box<dyn Future<Output = ()> + Sync + Send + 'static>>> {
    if t.is_zero() {
        return None;
    }
    Some(Box::pin(sleep(t)))
}

impl<IO: io::Read + Unpin> io::Read for PrudentIo<IO> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        if let Some(ref mut expired) = self.expired {
            match expired.as_mut().poll(cx) {
                Poll::Ready(_) => {
                    println!("expired ready");
                    // too much time passed since last read/write
                    return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
                }
                Poll::Pending => {
                    println!("expired pending");
                    // in good time
                }
            }
        }

        let res = Pin::new(&mut self.io).poll_read(cx, buf);
        println!("read {:?}", res);

        match res {
            Poll::Pending => {
                if self.expired.is_none() {
                    // No data, start checking for a timeout
                    self.expired = delay(self.timeout);
                }
            }
            Poll::Ready(_) => self.expired = None,
        }

        res
    }
}
impl<IO: io::Write + Unpin> io::Write for PrudentIo<IO> {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut self.io).poll_write(cx, buf)
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.io).poll_flush(cx)
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.io).poll_close(cx)
    }
}

#[cfg(test)]
mod io_tests {
    use super::*;
    use async_std::io::ReadExt;
    use async_std::prelude::FutureExt;
    use async_std::{
        io::{copy, Cursor},
        net::TcpStream,
    };
    use std::time::Duration;

    #[async_std::test]
    async fn fail_read_after_timeout() -> io::Result<()> {
        let mut output = b"______".to_vec();
        let io = PendIo;
        let mut io = PrudentIo::new(Duration::from_millis(5), io);
        let mut io = Pin::new(&mut io);
        insta::assert_debug_snapshot!(io.read(&mut output[..]).timeout(Duration::from_secs(1)).await,@"Ok(io::Err(timeou))");
        Ok(())
    }
    #[async_std::test]
    async fn timeout_expires() {
        let later = delay(Duration::from_millis(1)).expect("some").await;
        insta::assert_debug_snapshot!(later,@r"()");
    }
    /// Mock IO always pending
    struct PendIo;
    impl io::Read for PendIo {
        fn poll_read(
            self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
            _buf: &mut [u8],
        ) -> Poll<futures_io::Result<usize>> {
            Poll::Pending
        }
    }
    impl io::Write for PendIo {
        fn poll_write(
            self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
            _buf: &[u8],
        ) -> Poll<futures_io::Result<usize>> {
            Poll::Pending
        }

        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
            Poll::Pending
        }

        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
            Poll::Pending
        }
    }
}

最佳答案

异步超时的工作原理如下:

  1. 您创建了超时 future 。
  2. 运行时调用poll进入超时,检查超时是否已过期。
  3. 如果已过期,则返回 Ready 并完成。
  4. 如果它没有过期,它会以某种方式注册一个回调,以便在正确的时间过去时调用 cx.waker().wake() 或类似的回调。
  5. 时间过去后,将调用 #4 中的回调,该回调会在适当的唤醒器中调用 wake(),指示运行时再次调用 poll
  6. 这次poll返回Ready。完成!

您的代码的问题在于您从 poll() 实现内部创建了延迟:self.expired = delay(self.timeout);。但随后您返回Pending,甚至没有轮询一次超时。这样,就不会在任何地方注册调用 Waker 的回调。没有唤醒器,没有超时。

我看到了几种解决方案:

A。不要将 PrudentIo::expired 初始化为 None,而是直接在构造函数中创建 timeout。这样超时总是会在io之前被轮询至少一次,并且会被唤醒。但你总是会创建一个超时,即使实际上并不需要它。

B。创建超时时进行递归轮询:

Poll::Pending => {
    if self.expired.is_none() {
        // No data, start checking for a timeout
        self.expired = delay(self.timeout);
        return self.poll_read(cx, buf);
    }

这将不必要地调用io两次,因此它可能不是最佳的。

C。创建超时后添加对 poll 的调用:

Poll::Pending => {
    if self.expired.is_none() {
        // No data, start checking for a timeout
        self.expired = delay(self.timeout);
        self.expired.as_mut().unwrap().as_mut().poll(cx);
    }

也许您应该匹配 poll 的输出,以防它返回 Ready,但是,嘿,这是一个新的超时,它可能还处于待处理状态,而且似乎工作得很好。

关于rust - AsyncRead中Timeout不会超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71039085/

相关文章:

string - 如果 NUL 终止符不在切片的末尾,如何从以 NUL 终止的字节切片中获取 '&str'?

asynchronous - 如何使用async_std::task::sleep来模拟阻塞操作?

rust - 用async-std大块读取

rust - 在 Rust 中返回可变引用

pointers - 为什么有必要对一个可变原始指针连续执行两次转换?

asynchronous - 如何等待所有衍生的异步任务

generics - 我如何要求泛型类型支持数字运算?

asynchronous - 如何等待rust中的异步函数调用列表?

rust - 如果我有嵌套的异步/等待调用,我应该关心累积的开销吗?