我正在使用 Rust 和 Tokio 1.6 构建一个应用程序,该应用程序可以通过 hidapi = "1.2"
与 Elgato StreamDeck 交互。我想轮询 HID 设备的事件(按键按下/按键按下)并将这些事件发送到 mpsc channel ,同时观看单独的 mpsc channel 以获取传入命令以更新设备状态(重置、更改亮度、更新图像等) 。由于设备句柄不是线程安全的,因此我需要从单个线程中完成这两件事。
以下主要修改
这是我原来问题的重写。我在下面留下了临时答案,但为了提供一个更独立的示例,这里是使用 device_query = "0.2"
的基本过程:
use device_query::{DeviceState, Keycode};
use std::time::Duration;
use tokio;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::timeout;
#[tokio::main]
async fn main() {
// channel for key press events coming from device loop
let (key_tx, mut key_rx) = tokio::sync::mpsc::channel(32);
// channel for commands sent to device loop
let (dev_tx, mut dev_rx) = tokio::sync::mpsc::channel(32);
start_device_loop(60, key_tx, dev_rx);
println!("Waiting for key presses");
while let Some(k) = key_rx.recv().await {
match k {
Some(ch) => match ch {
Keycode::Q => dev_tx.clone().try_send(String::from("Quit!")).expect("Could not send command"),
ch => println!("{}", ch),
},
_ => (),
}
}
println!("Done.")
}
/// Starts a tokio task, polling the supplied device and sending key events
/// on the supplied mpsc sender
pub fn start_device_loop(hz: u32, tx: Sender<Option<Keycode>>, mut rx: Receiver<String>) {
let poll_wait = 1000 / hz;
let poll_wait = Duration::from_millis(poll_wait as u64);
tokio::task::spawn(async move {
let dev = DeviceState::new();
loop {
let mut keys = dev.query_keymap();
match keys.len() {
0 => (),
1 => tx.clone().try_send(Some(keys.remove(0))).unwrap(),
_ => println!("So many keys..."),
}
match timeout(poll_wait, rx.recv()).await {
Ok(cmd) => println!("Command '{}' received.", cmd.unwrap()),
_ => (),
};
// std::thread::sleep(poll_wait);
}
});
}
请注意,这不会编译 - 我收到错误 由异步 block 创建的 future 不是“发送”
且 在“impl Future”内,未为“实现特征“Send” *mut x11::xlib::_XDisplay'
。我对该错误的理解是,由于 device_query
不是线程安全的,并且 await
引入了范围跨线程移动的可能性,因此在非线程时可能不会等待任何内容-安全对象在范围内。事实上,如果我注释掉 match timeout...
周围的 block 并取消注释 std::thread::sleep
一切都会编译并运行。
这让我回到了最初的问题;如何在不使用 await
或 poll_recv()
明显禁果的情况下在单个线程中发送和接收消息?
最佳答案
经过多次搜寻,我在 futures
箱中找到了 noop_waker
,它似乎可以与 poll_recv
结合执行我需要的操作:
pub fn start_device_loop(hz: u32, tx: Sender<Option<Keycode>>, mut rx: Receiver<String>) {
let poll_wait = 1000 / hz;
let poll_wait = Duration::from_millis(poll_wait as u64);
tokio::task::spawn_blocking(move || {
let dev = DeviceState::new();
let waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
loop {
let mut keys = dev.query_keymap();
match keys.len() {
0 => (),
1 => tx.clone().try_send(Some(keys.remove(0))).unwrap(),
_ => println!("So many keys..."),
}
match rx.poll_recv(&mut cx) {
Poll::Ready(cmd) => println!("Command '{}' received.", cmd.unwrap()),
_ => ()
};
std::thread::sleep(poll_wait);
}
});
}
在深入研究文档和 tokio 源代码后,我找不到任何表明 poll_recv 应该是仅限内部函数的内容,或者在此处使用它会产生任何明显的副作用。让进程以 125hz 运行,我也没有看到任何多余的资源使用。
我将上面的代码留给后人,但自从提出这个问题后,try_recv
方法已添加到接收器中,使这一切变得更加清晰。
关于rust - Tokio mpsc 接收器上的非阻塞接收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68961504/