rust - Tokio mpsc 接收器上的非阻塞接收

标签 rust rust-tokio

我正在使用 Rust 和 Tokio 1.6 构建一个应用程序,该应用程序可以通过 hidapi = "1.2" 与 Elgato StreamDeck 交互。我想轮询 HID 设备的事件(按键按下/按键按下)并将这些事件发送到 mpsc channel ,同时观看单独的 mpsc channel 以获取传入命令以更新设备状态(重置、更改亮度、更新图像等) 。由于设备句柄不是线程安全的,因此我需要从单个线程中完成这两件事。

以下主要修改

这是我原来问题的重写。我在下面留下了临时答案,但为了提供一个更独立的示例,这里是使用 device_query = "0.2" 的基本过程:

use device_query::{DeviceState, Keycode};
use std::time::Duration;
use tokio;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::timeout;

#[tokio::main]
async fn main() {
    // channel for key press events coming from device loop
    let (key_tx, mut key_rx) = tokio::sync::mpsc::channel(32);
    // channel for commands sent to device loop
    let (dev_tx, mut dev_rx) = tokio::sync::mpsc::channel(32);

    start_device_loop(60, key_tx, dev_rx);

    println!("Waiting for key presses");
    while let Some(k) = key_rx.recv().await {
        match k {
            Some(ch) => match ch {
                Keycode::Q => dev_tx.clone().try_send(String::from("Quit!")).expect("Could not send command"),
                ch => println!("{}", ch),
            },
            _ => (),
        }
    }
    println!("Done.")
}

/// Starts a tokio task, polling the supplied device and sending key events
/// on the supplied mpsc sender
pub fn start_device_loop(hz: u32, tx: Sender<Option<Keycode>>, mut rx: Receiver<String>) {
    let poll_wait = 1000 / hz;
    let poll_wait = Duration::from_millis(poll_wait as u64);

    tokio::task::spawn(async move {
        let dev = DeviceState::new();

        loop {
            let mut keys = dev.query_keymap();
            match keys.len() {
                0 => (),
                1 => tx.clone().try_send(Some(keys.remove(0))).unwrap(),
                _ => println!("So many keys..."),
            }
            
            match timeout(poll_wait, rx.recv()).await {
                Ok(cmd) => println!("Command '{}' received.", cmd.unwrap()),
                _ => (),
            };
            // std::thread::sleep(poll_wait);
        }
    });
}

请注意,这不会编译 - 我收到错误 由异步 block 创建的 future 不是“发送”在“impl Future”内,未为“实现特征“Send” *mut x11::xlib::_XDisplay'。我对该错误的理解是,由于 device_query 不是线程安全的,并且 await 引入了范围跨线程移动的可能性,因此在非线程时可能不会等待任何内容-安全对象在范围内。事实上,如果我注释掉 match timeout... 周围的 block 并取消注释 std::thread::sleep 一切都会编译并运行。

这让我回到了最初的问题;如何在不使用 awaitpoll_recv() 明显禁果的情况下在单个线程中发送和接收消息?

最佳答案

经过多次搜寻,我在 futures 箱中找到了 noop_waker,它似乎可以与 poll_recv 结合执行我需要的操作:

pub fn start_device_loop(hz: u32, tx: Sender<Option<Keycode>>, mut rx: Receiver<String>) {
    let poll_wait = 1000 / hz;
    let poll_wait = Duration::from_millis(poll_wait as u64);

    tokio::task::spawn_blocking(move || {
        let dev = DeviceState::new();

        let waker = futures::task::noop_waker();
        let mut cx = std::task::Context::from_waker(&waker);

        loop {
            let mut keys = dev.query_keymap();
            match keys.len() {
                0 => (),
                1 => tx.clone().try_send(Some(keys.remove(0))).unwrap(),
                _ => println!("So many keys..."),
            }
            
            match rx.poll_recv(&mut cx) {
                Poll::Ready(cmd) => println!("Command '{}' received.", cmd.unwrap()),
                _ => ()
            };
            std::thread::sleep(poll_wait);
        }
    });
}

在深入研究文档和 tokio 源代码后,我找不到任何表明 poll_recv 应该是仅限内部函数的内容,或者在此处使用它会产生任何明显的副作用。让进程以 125hz 运行,我也没有看到任何多余的资源使用。


我将上面的代码留给后人,但自从提出这个问题后,try_recv 方法已添加到接收器中,使这一切变得更加清晰。

关于rust - Tokio mpsc 接收器上的非阻塞接收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68961504/

相关文章:

rust - 为什么我不能在 par_iter 上调用 collect_into?

rust - 为什么 Rust 需要在 "impl"关键字之后进行泛型类型声明?

rust - 从 Arc Mutex 获取向量

rust - 迭代 Rust 中的命名正则表达式组

rust - 如何阅读基于 Tokio 的 Hyper 请求的整个主体?

rust - (tokio::spawn)借用的值生命周期不长-参数要求 `sleepy`是借用的 `' static`

logging - 是否可以在编译时更改应用程序的日志级别?

rust - 使用 tokio-tungstenite 时如何获取 header ?

asynchronous - 为什么在使用带有 std::sync::Mutex 的 Tokio 时会出现死锁?

asynchronous - rust 东京 : calling async function from sync closure