multithreading - 如何从标准库生成的线程向 Tokio 异步任务发送消息?

标签 multithreading rust rust-tokio rust-tonic

我有一个设置,我的程序使用 std::thread::spawn 为 CPU 绑定(bind)计算生成多个线程。

我需要一个 GRPC 服务器来处理传入的命令并流式传输工作线程完成的输出。我正在为 GRPC 服务器使用 tonic,它只在 Tokio future 中提供异步实现。

我需要能够从我的“正常”标准库线程向 Tokio future 发送消息。

我已经将我的代码简化为这里的最低限度:

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {}
    });

    let h = thread::spawn(move || {
        // do work
        tx.send(1).await; //<------ error occurs here since I can't await in a non-async block
    });

    h.join().unwrap();
}

我的主工作线程如何与 Tokio 生成的 GRPC 服务器通信?

最佳答案

您可以使用 tokio 的 sync 功能。有两个选项 - UnboundedSenderSender::blocking_send() .

无界发送者的问题在于它没有背压,如果您的生产者比消费者快,您的应用程序可能会因内存不足错误而崩溃,或者耗尽生产者使用的其他有限资源。

作为一般规则,您应该避免使用无界队列,这让我们有了使用 blocking_send() 的更好选择:

Playground :

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {
            println!("Received: {:?}", v);
        }
    });

    let h = thread::spawn(move || {
        // do work
        tx.blocking_send(1).unwrap();
    });

    h.join().unwrap();
}

关于multithreading - 如何从标准库生成的线程向 Tokio 异步任务发送消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68561218/

相关文章:

asynchronous - Tokio react 器是否轮询每个组合器之间所有可能的 poll() 函数?

java - 是否有具有队列限制的 ExecutorService 实现以及用新队列成员替换旧队列成员的选项?

c# - 为什么 WhenAll 无限期地等待多个 ping?

c# - 关于 ResetEvents 和 Threading,此 tcp 监听器代码是否正确实现?

rust - 使用 None 访问 Rust 中的嵌套 HashMap

vector - Rust `Vec`-无法在 `Vec`方法内借用 `impl`作为不可变的(错误[E0502])

rust - 默认泛型参数

rust - tokio::spawn(my_future).await 和只是 my_future.await 有什么区别?

java - 按顺序打印线程并为每个线程设置随机 sleep 时间

Rust libp2p 在 crate libp2p 中找不到函数development_transport