rust - 如何在 Rust 中使用 Windows IOCP 实现异步函数?

标签 rust async-await

在 C# 中,我们可以通过实现 INotifyCompletion 来构建一个等待对象。界面。

public class MyAwaiter<T> : INotifyCompletion
{
    public bool IsCompleted { get; private set; }
    public T GetResult()
    {
        throw new NotImplementedException();
    }
    public void OnCompleted(Action continuation)
    {
        throw new NotImplementedException();
    }
}

但是在 Rust 中,我不知道如何构建一个异步函数来支持现有异步库中当前不支持的操作,例如与低级设备通信。

你能给我一个在 rust 中自我实现异步函数的例子吗?

最佳答案

您需要实现 Future结构上的 trait,让我们看看 std 对 Future 的定义,具体来说是 .poll 方法:

When a future is not ready yet, poll returns Poll::Pending and stores a clone of the Waker copied from the current Context. This Waker is then woken once the future can make progress. For example, a future waiting for a socket to become readable would call .clone() on the Waker and store it.



将其与操作系统提供的一些异步机制一起使用的一种方法是发送克隆的 Waker到一个新生成的线程(或者理想情况下,一个线程池,您可以将事件排队以唤醒),该线程阻塞您设置的事件并调用 wake() 完成后。

在这个示例中,我使用了在线程上休眠,但是通过使用评论者建议的 Mio 或直接使用 IOCP,您可以获得非常相似的代码,重要的方面只是唤醒 Waker并通知Future它发生了。

struct MyEvent {
    is_ready: Arc<AtomicBool>, // Could use a channel to transfer when the task is ready instead
    is_polled: bool,           // Prevents multiple events to get enqueued on the same future
}

impl MyEvent {
    fn new() -> Self {
        MyEvent {
            is_ready: Arc::new(AtomicBool::new(false)),
            is_polled: false,
        }
    }
}

impl Future for MyEvent {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        match self.is_ready.load(atomic::Ordering::SeqCst) {
            true => Poll::Ready(()),
            false => {
                if self.is_polled {
                    Poll::Pending
                } else {
                    let waker = cx.waker().clone();
                    let channel = Arc::clone(&self.is_ready);
                    self.get_mut().is_polled = true;
                    thread::spawn(move || {
                        // Here you block based on whatever event
                        thread::sleep(Duration::from_secs(5));
                        channel.store(true, atomic::Ordering::SeqCst);
                        waker.wake();
                    });
                    Poll::Pending
                }
            }
        }
    }
}

编辑:我刚刚注意到,每当进行新的轮询时,您都需要更新唤醒器(尽管大多数执行器不应该发生这种情况,因为它们应该仅在 Waker 被唤醒时重新轮询)。该解决方案并非易事,我建议读者在其源代码和提供的 channel ( oneshot )和 AtomicWaker 中检查 Futures crate ,这应该使这更简单。如果需要解决此问题的实际实现,我将尝试使用简单的 POC。

关于rust - 如何在 Rust 中使用 Windows IOCP 实现异步函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61037621/

相关文章:

javascript - 我如何在 Node.js (JavaScript) 中等待? l 需要暂停一段时间

module - 当有 main.rs 和 lib.rs 时 Rust 模块混淆

c++ - Rust Cargo CMake 与 C++ 库依赖项集成

c# - 为什么带有 await 关键字的异步方法仍然阻塞主线程?

javascript - 与对象进行 Angular 异步调用?

c# - 任务排序和重新进入

generics - Rust API设计: Type parameters for optional values

gtk - 如何在 gtk-rs 中的 cairo::Context 上渲染图像

arrays - 比较定长数组

c# - Visual Studio 扩展包是否支持异步操作