rust - 有没有办法为 tokio::spawn_blocking 创建多个池,这样一些任务就不会饿死其他任务?

标签 rust async-await rust-tokio

我正在使用 Tokio 编写一些异步 Rust 代码,但遇到了问题。我有一些任务需要访问连接池,连接池的性质意味着一次只能运行固定数量 (NUMCPUS) - 所有其他请求将阻塞,直到有空闲连接为止。

目前,我只是在使用 task::spawn_blocking,这是一种可行的方法。然而,这样做的缺点是,一旦 512 个请求阻塞在连接池上,Tokio 的整个阻塞池 就会耗尽,所有阻塞任务都会排队。这可以防止来自代码中不依赖于连接池的其他地方的任何 spawn_blocking 调用也运行。

有什么方法可以告诉 Tokio 将一组特定的阻塞任务分开并一次只生成 N 个,同时仍然允许不相关的阻塞任务在不排队的情况下运行?

spawn_blocking 文档建议将 Rayon 用于 CPU 密集型任务,但是 a) 不清楚如何将 Rayon 与 Tokio 集成,以及 b) 我的任务无论如何都不是 CPU 密集型的。

最佳答案

您可以使用 Semaphore :用并发允许的任务数初始化它,并让每个任务在处理之前获取信号量,并在完成时释放它。像(未经测试):

use tokio::sync::Semaphore;

struct Pool {
    sem: Semaphore,
}

impl Pool {
    fn new (size: usize) -> Self {
        Pool { sem: Semaphore::new (size), }
    }

    async fn spawn<T> (&self, f: T) -> T::Output
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
    {
        let handle = self.sem.acquire().await;
        f.await
    }
}

关于rust - 有没有办法为 tokio::spawn_blocking 创建多个池,这样一些任务就不会饿死其他任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69546598/

相关文章:

arrays - 如何创建超过 32 个泛型类型元素的数组 T : Default?

javascript - 如何将函数变成异步函数

rust - 如何停止超 HTTP Web 服务器并返回错误?

generics - 如何在 Rust 中声明可比较的东西的通用向量

c++ - 等效于 Rust 的 include_str 的 C++ 宏

node.js - 如何通过 async/await 异步读取和汇总所有文件的数据

javascript - Nodejs 一个接一个地运行异步函数

rust - 如何固定RefCell的内容?

asynchronous - RuSTLazy_static 和 tokio::sync::mpsc::tokio::select 中的 channel

Rust Flatbuffers 索引超出范围错误