asynchronous - 在未来的实现中手动轮询流

标签 asynchronous rust future rust-tokio

我正在迁移到 futures 0.3 和 tokio 0.2,有一种重复出现的模式我无法重新使用。我不确定这种模式是否已经过时,或者我是否对 Pin 做错了什么。

通常我有一种类型,它包含一个套接字和几个 channel 接收器。此类结构的 Future 实现包括重复轮询流,直到它们返回 Pending(在 0.1 生态系统中为 NotReady)。

但是,在 futures 0.3 中,Future::pollStream::poll_next 采用 self 而不是 &mut self,并且此模式不再有效:

use futures::{
    stream::Stream,
    task::{Context, Poll},
    Future,
};
use std::pin::Pin;
use tokio::sync::mpsc::{Receiver, Sender};

/// Dummy structure that represent some state we update when we
/// receive data or events.
struct State;

impl State {
    fn update(&mut self, _data: Vec<u8>) {
        println!("updated state");
    }
    fn handle_event(&mut self, _event: u32) {
        println!("handled event");
    }
}

/// The future I want to implement.
struct MyFuture {
    state: State,
    data: Receiver<Vec<u8>>,
    events: Receiver<Vec<u8>>,
}

impl MyFuture {
    fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        let MyFuture {
            ref mut data,
            ref mut state,
            ..
        } = self.get_mut();

        loop {
            // this breaks, because Pin::new consume the mutable
            // reference on the first iteration of the loop.
            match Pin::new(data).poll_next(cx) {
                Ready(Some(vec)) => state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }

    // unimplemented, but we basically have the same problem than with
    // `poll_data()`
    fn poll_events(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        unimplemented!()
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        if let Ready(_) = self.poll_data(cx) {
            return Ready(());
        }

        // This does not work because self was consumed when
        // self.poll_data() was called.
        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}

有没有办法修复该代码?如果不是,我可以使用什么模式来实现相同的逻辑?

最佳答案

您可以使用 Pin::as_mut 来避免消耗 Pin

impl MyFuture {
    fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        let MyFuture {
            ref mut data,
            ref mut state,
            ..
        } = self.get_mut();

        let mut data = Pin::new(data); // Move pin here
        loop {
            match data.as_mut().poll_next(cx) {   // Use in loop by calling `as_mut()`
                Ready(Some(vec)) => state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }
}

并且在 Future 中实现:

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        // `as_mut()` here to avoid consuming
        if let Ready(_) = self.as_mut().poll_data(cx) { 
            return Ready(());
        }

        // can consume here as this is the last invocation
        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}

编辑:

提示:仅在必要时尝试使用Pin。在您的情况下,您实际上并不需要 poll_data 函数中的固定指针。 &mut self 就好了,它减少了一点 Pin 的使用:

impl MyFuture {
    fn poll_data(&mut self, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        loop {
            match Pin::new(&mut self.data).poll_next(cx) {
                Ready(Some(vec)) => self.state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }
}

和 future 实现:

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        if let Ready(_) = self.poll_data(cx) {
            return Ready(());
        }

        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}

关于asynchronous - 在未来的实现中手动轮询流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57536217/

相关文章:

asynchronous - 在发生繁重的I/O操作时,如何转换线程代码以使用 future 连续检查计算值?

debugging - 通过写入 std::io::stdout() 输出不可见

unit-testing - 我如何在Dart中的then语句中进行测试

java - 奇怪的 Java 线程池行为 - 除非使用 Future,否则会丢失任务

javascript - XMLHttpRequest onprogress 不触发异步 = false

javascript - 当变量更改时如何更新异步等待函数?

rust - 如何修剪少于n次的空间?

list - 如果 Flutter 中的网格列表为空,则插入一个 Text()

javascript - 在异步函数中运行并发 HTTP 请求

generics - 难以理解 Rust 中的特征和泛型