asynchronous - RuSTLazy_static 和 tokio::sync::mpsc::tokio::select 中的 channel

标签 asynchronous rust channel rust-tokio lazy-static

我最近开始使用 Rust 进行编码,我很喜欢它。我正在一个项目上编码,我想“包装”C-API。在一种情况下,我必须在 Rust 中定义 C 可以调用的回调。我让bindgen 创建了回调。由于代码需要异步运行,因此我使用 tokio。

我想要实现的目标

我将 main 函数创建为 tokio::main。在主函数中,我创建了 2 个异步任务,一个监听 channel ,另一个触发 C-API 中的消息队列。如果消息可用,我想通过回调函数上的 channel 发送它们,这样我就可以接收有关我正在监听事件的任务的消息。稍后我想通过 SSE 或 GraphQL 订阅将这些消息发送给多个客户端。

我无法更改 C-Callbacks,因为它们需要可传递给 C-API,并且我必须使用回调,否则我不会收到消息。

我最新的方法看起来像这样简化了:

use lazy_static::lazy_static;
use tokio::sync::{
    mpsc::{channel, Receiver, Sender},
    Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};

lazy_static! {
    static ref BROADCAST_CONNECT: Mutex<(Sender<bool>, Receiver<bool>)> = Mutex::new(channel(128));
    static ref BROADCAST_CONNECTIONSTATE: Mutex<(Sender<u32>, Receiver<u32>)> = Mutex::new(channel(128));
}

#[tokio::main]
async fn main() {    
    unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
    unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API

    tokio::spawn(async move { // wait for a channel to have a message
        loop {
            tokio::select! {
                // wating for a channel to receive a message
                Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
                Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
            }
        }
    });

    let handle2 = tokio::spawn(async move {
        loop {
            unsafe {
                message_queue_in_c(
                    some_handle,
                    true,
                    Duration::milliseconds(100).num_microseconds().unwrap(),
                )
            }
        }
    });

    handle.await.unwrap();
    habdle2.await.unwrap();
}

// the callback function that gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
    // C-API is not async, so use synchronous lock
    match BROADCAST_CONNECT.try_lock() {
        Ok(value) => match value.0.blocking_send(is_connected) {
            Ok(_) => {}
            Err(e) => {
                eprintln!("{}", e)
            }
        },
        Err(e) => {
            eprintln!("{}", e)
        }
    }
}

unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
    match BROADCAST_CONNECTIONSTATE.try_lock() {
        Ok(value) => match value.0.blocking_send(connectionstate) {
            Ok(_) => {}
            Err(e) => {
                eprintln!("{}", e)
            }
        },
        Err(e) => {
            eprintln!("{}", e)
        }
    }
}

问题:

error[E0716]: temporary value dropped while borrowed
  --> src/main.rs:37:29
   |
35 | /             tokio::select! {
36 | |                 Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
37 | |                 Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
   | |                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ creates a temporary which is freed while still in use
38 | |             }
   | |             -
   | |             |
   | |_____________temporary value is freed at the end of this statement
   |               borrow later captured here by closure
   |
   = note: consider using a `let` binding to create a longer lived value

我理解该消息以及为什么会发生这种情况,但我想不出解决方案。

我有一个使用 crossbeam channel 的工作示例,但我宁愿使用 tokio 的异步 channel ,所以我没有那么多依赖项,并且一切都是异步的。

工作示例:

use lazy_static::lazy_static;
use crossbeam::{
    channel::{bounded, Receiver, Sender},
    select,
};
use bindgen::{notify_connect, notify_connectionstate};

lazy_static! {
    static ref BROADCAST_CONNECT: (Sender<bool>, Receiver<bool>) = bounded(128);
    static ref BROADCAST_CONNECTIONSTATE: (Sender<u32>, Receiver<u32>) = bounded(128);
}

#[tokio::main]
async fn main() {    
    unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
    unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API

    let handle1 = tokio::spawn(async move {
        loop {
            select! {
                recv(&BROADCAST_CONNECT.1) -> msg => println!("is_connected: {:?}", msg.unwrap()),
                recv(&BROADCAST_CONNECTIONSTATE.1) -> msg => println!("connectionstate: {:?}", msg.unwrap()),
            }
        }
    });

    let handle2 = tokio::spawn(async move {
        loop {
            unsafe {
                message_queue_in_c(
                    some_handle,
                    true,
                    Duration::milliseconds(100).num_microseconds().unwrap(),
                )
            }
        }
    });

    handle.await.unwrap();
    handle2.await.unwrap();
}

// the callback function thats gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
    match &BROADCAST_CONNECT.0.send(is_connected) {
        Ok(_) => {}
        Err(e) => eprintln!("{}", e),
    };
}

unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
    match BROADCAST_CONNECTIONSTATE.0.send(connectionstate) {
        Ok(_) => {}
        Err(e) => eprintln!("{}", e),
    }
}

替代方案

另一种选择,我也没有开始工作,是使用某种本地函数或使用闭包。但我不确定这是否有效,即使是否有效。也许有人有想法。如果这样的东西可以工作,那就太好了,这样我就不必使用lazy_static(我不想在我的代码中使用全局/静态变量)

use tokio::sync::{
    mpsc::{channel, Receiver, Sender},
    Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};

#[tokio::main]
async fn main() {
    let app = app::App::new();

    let mut broadcast_connect = channel::<bool>(128);
    let mut broadcast_connectionstate = channel::<bool>(128);

    let notify_connect = {
        unsafe extern "C" fn _notify_connect(is_connected: bool) {
            match broadcast_connect.0.blocking_send(is_connected) {
                Ok(_) => {}
                Err(e) => {
                    eprintln!("{}", e)
                }
            }
        }
    };

    let notify_connectionstate = {
        unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
            match broadcast_connectionstate.0.blocking_send(connectionstate) {
                Ok(_) => {}
                Err(e) => {
                    eprintln!("{}", e)
                }
            }
        }
    };

    unsafe { notify_connect(Some(notify_connect)) } // pass the callback function to the C-API
    unsafe { notify_connectionstate(Some(notify_connectionstate)) } // pass the callback function to the C-API

    let handle = tokio::spawn(async move {
        loop {
            tokio::select! {
                Some(msg) = broadcast_connect.1.recv() => println!("{}", msg),
                Some(msg) = broadcast_connectionstate.1.recv() => println!("{}", msg),
            }
        }
    });

    let handle2 = tokio::spawn(async move {
        loop {
            unsafe {
                message_queue_in_c(
                    some_handle,
                    true,
                    Duration::milliseconds(100).num_microseconds().unwrap(),
                )
            }
        }
    });

    handle.await.unwrap();
    handle2.await.unwrap();
}

这种方法的问题

can't capture dynamic environment in a fn item
  --> src/main.rs:47:19
   |
47 |             match broadcast_connectionstate.0.blocking_send(connectionstate) {
   |                   ^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = help: use the `|| { ... }` closure form instead

如果有人能解决我的问题,那就太好了。如果这是一种全新的方法,那也很好。如果 Channels 或 tokio 或其他任何东西都不是正确的选择,那也没关系。主要是我用了tokio,因为一个crate我也在用tokio,所以我不需要有更多的依赖。

非常感谢您阅读到这里。

最佳答案

如果您对第一个示例进行以下更改,它应该可以工作:

  1. tokio::sync::Mutex 替换为 std::sync::Mutex,这样您就不必在中使用 try_lock回调。
  2. 不要将接收者存储在互斥体中,仅将发送者存储在互斥体中。
  3. 在回调中,要么使用无界 channel ,要么确保在发送之前释放锁定。
  4. 使用 std::thread::spawn 在专用线程上运行阻塞 C 代码,而不是在 tokio::spawn 中。 (why?)

为了不将接收器存储在互斥锁中,您可以这样做:

static ref BROADCAST_CONNECT: Mutex<Option<Sender<bool>>> = Mutex::new(None);

// in main
let (send, recv) = channel(128);
*BROADCAST_CONNECT.lock().unwrap() = Some(send);

如果您想要有界 channel ,可以先克隆 channel ,然后对锁调用 drop,然后使用 blocking_send 发送来释放锁。对于无限制的 channel ,这并不重要,因为发送是即时的。

// in C callback
let lock = BROADCAST_CONNECT.lock().unwrap();
let send = lock.as_ref().clone();
drop(lock);
send.blocking_send(...);

关于asynchronous - RuSTLazy_static 和 tokio::sync::mpsc::tokio::select 中的 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65452692/

相关文章:

Rust E0597 - 借来的值(value)活得不够长

rust - 为什么 HashMap 有 iter_mut() 而 HashSet 没有?

Golang 缓冲 channel : Ignore/suppress sending repeated values

multithreading - 如果一个 goroutine 已完成,控制 goroutine 关闭的规范方法是什么?

c# - 可以为抽象类的子类创建静态构造函数吗?

c# - 避免不受支持的即发即忘异步方法或委托(delegate) AsyncFixer03

ios - 如何同时执行多个异步方法并在它们全部完成时获得回调?

C# 异步 Ping : not work from a Threading. 定时器

rust - 如何测试特征对象之间的相等性?

sftp - Jsch错误-无法发送 channel 请求