concurrency - 从 mpsc UnboundedReceiver 读取所有可用消息,而不会不必要地阻塞

标签 concurrency rust future

我有一个 futures::sync::mpsc::unbounded channel 。我可以发送消息到 UnboundedSender<T>但从 UnboundedReciever<T> 接收它们时遇到问题.
我使用 channel 向 UI 线程发送消息,并且我有一个函数在每一帧都被调用,我想在每一帧上从 channel 读取所有可用消息,而不会在没有可用消息时阻塞线程消息。

从我读到的 Future::poll方法是我需要的,我只是轮询,如果我得到 Async::Ready,我对消息做一些事情,如果没有,我就从函数返回。
问题是 poll当没有任务上下文时会出现 panic (我不确定这意味着什么或如何处理)。

我尝试过的:

let (sender, receiver) = unbounded(); // somewhere in the code, doesn't matter
// ...
let fut = match receiver.by_ref().collect().poll() {
    Async::Ready(items_vec) => // do something on UI with items,
    _ => return None
}

这会引起 panic ,因为我没有任务上下文。

还试过:

let (sender, receiver) = unbounded(); // somewhere in the code, doesn't matter
// ...
let fut = receiver.by_ref().collect(); // how do I run the future?
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(fut); // this blocks the thread when there are no items in the receiver

我想帮助阅读 UnboundedReceiver<T>当流中没有项目时不阻塞线程(然后什么也不做)。

谢谢!

最佳答案

你错误地使用了 futures——你需要一个 Runtime 和更多的样板来让它工作:


    extern crate tokio;
    extern crate futures;
    
    use tokio::prelude::*;
    use futures::future::{lazy, ok};
    use futures::sync::mpsc::unbounded;
    use tokio::runtime::Runtime;
    
    fn main() {
        let (sender, receiver) = unbounded::<i64>();
        let receiver = receiver.for_each(|result| {
            println!("Got: {}", result);
            Ok(())
        });
    
        let rt = Runtime::new().unwrap();
        rt.executor().spawn(receiver);
    
        let lazy_future = lazy(move || {
            sender.unbounded_send(1).unwrap();
            sender.unbounded_send(2).unwrap();
            sender.unbounded_send(3).unwrap();
            ok::<(), ()>(())
        });
    
        rt.block_on_all(lazy_future).unwrap();
    }

进一步阅读,来自 Tokio's runtime model :

[...]in order to use Tokio and successfully execute tasks, an application must start an executor and the necessary drivers for the resources that the application’s tasks depend on. This requires significant boilerplate. To manage the boilerplate, Tokio offers a couple of runtime options. A runtime is an executor bundled with all necessary drivers to power Tokio’s resources. Instead of managing all the various Tokio components individually, a runtime is created and started in a single call.

Tokio offers a concurrent runtime and a single-threaded runtime. The concurrent runtime is backed by a multi-threaded, work-stealing executor. The single-threaded runtime executes all tasks and drivers on thee current thread. The user may pick the runtime with characteristics best suited for the application.

关于concurrency - 从 mpsc UnboundedReceiver 读取所有可用消息,而不会不必要地阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53666943/

相关文章:

rust - 当存在嵌套的源目录时, 'use' 和 'mod' 如何工作?

c++ - 关于 std::future 的编译错误

rust - 什么是 futures::future::lazy?

java - 如何在未来取消时终止 Callable 中的 CXF Web 服务调用

java - 如何使用 ScheduledExecutorService 处理重复任务中的错误?

java - Akka - ActorRef.tell() 需要几分钟来传递消息

python - 如何同时处理在线程之间传递数据(3 个 while True 循环)?

sockets - Rust 中 TcpListener 和 TcpAcceptor 的职责

rust - 如何静音 `rustc` 关于要链接到哪些 native 工件的注释?

concurrency - 分析 OpenJDK 或任何 Java VM 中的锁