我有一个 futures::sync::mpsc::unbounded
channel 。我可以发送消息到 UnboundedSender<T>
但从 UnboundedReciever<T>
接收它们时遇到问题.
我使用 channel 向 UI 线程发送消息,并且我有一个函数在每一帧都被调用,我想在每一帧上从 channel 读取所有可用消息,而不会在没有可用消息时阻塞线程消息。
从我读到的 Future::poll
方法是我需要的,我只是轮询,如果我得到 Async::Ready,我对消息做一些事情,如果没有,我就从函数返回。
问题是 poll
当没有任务上下文时会出现 panic (我不确定这意味着什么或如何处理)。
我尝试过的:
let (sender, receiver) = unbounded(); // somewhere in the code, doesn't matter
// ...
let fut = match receiver.by_ref().collect().poll() {
Async::Ready(items_vec) => // do something on UI with items,
_ => return None
}
这会引起 panic ,因为我没有任务上下文。
还试过:
let (sender, receiver) = unbounded(); // somewhere in the code, doesn't matter
// ...
let fut = receiver.by_ref().collect(); // how do I run the future?
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(fut); // this blocks the thread when there are no items in the receiver
我想帮助阅读 UnboundedReceiver<T>
当流中没有项目时不阻塞线程(然后什么也不做)。
谢谢!
最佳答案
你错误地使用了 futures——你需要一个 Runtime
和更多的样板来让它工作:
extern crate tokio;
extern crate futures;
use tokio::prelude::*;
use futures::future::{lazy, ok};
use futures::sync::mpsc::unbounded;
use tokio::runtime::Runtime;
fn main() {
let (sender, receiver) = unbounded::<i64>();
let receiver = receiver.for_each(|result| {
println!("Got: {}", result);
Ok(())
});
let rt = Runtime::new().unwrap();
rt.executor().spawn(receiver);
let lazy_future = lazy(move || {
sender.unbounded_send(1).unwrap();
sender.unbounded_send(2).unwrap();
sender.unbounded_send(3).unwrap();
ok::<(), ()>(())
});
rt.block_on_all(lazy_future).unwrap();
}
进一步阅读,来自 Tokio's runtime model :
[...]in order to use Tokio and successfully execute tasks, an application must start an executor and the necessary drivers for the resources that the application’s tasks depend on. This requires significant boilerplate. To manage the boilerplate, Tokio offers a couple of runtime options. A runtime is an executor bundled with all necessary drivers to power Tokio’s resources. Instead of managing all the various Tokio components individually, a runtime is created and started in a single call.
Tokio offers a concurrent runtime and a single-threaded runtime. The concurrent runtime is backed by a multi-threaded, work-stealing executor. The single-threaded runtime executes all tasks and drivers on thee current thread. The user may pick the runtime with characteristics best suited for the application.
关于concurrency - 从 mpsc UnboundedReceiver 读取所有可用消息,而不会不必要地阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53666943/