c++ - asio::io_service 多线程优先级队列处理

标签 c++ multithreading boost-asio

我在我的多线程 C++ 代码中经常使用 asio::io_service。最近,由于在处理各种任务时缺乏优先级,我发现我的代码存在瓶颈。我很自然地遇到了这个提升 example以确保某些任务的优先级高于其余任务。但此示例仅适用于单线程应用程序。

通常,我的代码使用这种模式。

boost::asio::io_service ioService;
boost::thread_group threadPool;
boost::asio::io_service::work work(ioService);

int noOfCores = boost::thread::hardware_concurrency();
for (int i = 0 ; i < noOfCores ; i ++)
{
    threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
}
threadPool.join_all();

我从其他各种线程执行了很多 ioService.post() 操作,所有这些处理程序都具有相同的优先级。

现在,如果我要使用 boost 示例中的 handler_priority_queue,首先我必须向 add() 和 execute_all() 函数添加一些互斥保护。

boost::mutex _mtx;
void add(int priority, boost::function<void()> function)
{
    boost::lock_guard<boost::mutex> lock(_mtx);
    handlers_.push(queued_handler(priority, function));
}

void execute_all()
{
    while (!handlers_.empty())
    {
        boost::unique_lock<boost::mutex> lock(_mtx);
        queued_handler handler = handlers_.top();
        handlers_.pop();
        lock.unlock();
        handler.execute();
    }
}

但是,我不确定用什么替换我当前代码中的以下行。

    threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));

我显然需要以某种方式将 io_service::run 替换为 handler_priority_queue::execute_all()。但是怎么办?什么是最好的方法?

我可以做到这一点......

    threadPool.create_thread(boost::bind(&handler_priority_queue::execute_all,
&pri_queue));

但是 execute_all() 马上就出来了。我认为 execute_all() 需要以某种方式重新设计。这个怎么样?它有效,但我不确定其中的陷阱。

void execute_all()
{
    while (ioService.run_one())
    {
        while (ioService.poll_one());
        while (true)
        {
            queued_handler handler;
            {
                boost::lock_guard<boost::mutex> lock(_mtx);
                if (handlers_.empty())
                {
                    break;
                }
                else
                {
                    handler = handlers_.top();
                    handlers_.pop();
                }
            }
            handler.execute();
        }
    }
}

最佳答案

Asio 不提供这种可能性。该示例仅限于单个线程,因为它不修改 Asio 的调度程序。调度程序以 FIFO 方式在线程之间分配任务,我不知道有什么方法可以修改它。只要在启动异步操作时无法指定优先级(例如 io_service::post),调度程序就不知道任务优先级,因此无法使用它。

当然,您可以为每个线程使用 priority_queue,但在这种情况下,您的优先级将产生有限的“线程本地”效果:只有调度到同一线程的任务才会根据它们的优先级执行。考虑示例(伪代码):

io_service.post(task(priority_1));
io_service.post(task(priority_2));
io_service.post(task(priority_3));

thread_1(io_service.run());
thread_2(io_service.run());

假设任务 1 和 3 由 thread_1 执行,任务 2 由 thread_2 执行。因此,如果在链接示例中使用优先级队列,则 thread_1 将执行任务 3,然后执行任务 1。但是 thread_2 不知道这些任务,会立即执行任务 2,可能在任务 3 之前执行。

您的选择要么是实现自己的调度程序(复杂性取决于您的要求,但通常会很棘手),要么是寻找第 3 方解决方案。例如。我会检查英特尔 TBB Priority .

编辑:试图详细说明“自己的调度程序”案例:

对于更简单的版本、线程池和从队列中提取的线程,您需要一个非常好的多生产者/多消费者并发队列。从优先级的角度来看,这将是一个相当公平的解决方案:所有优先级较高的任务将(几乎总是)在优先级较低的任务之前开始执行。如果性能比公平更重要,则可以改进此解决方案。但这需要另一个问题和有关您的特定案例的大量详细信息。

关于c++ - asio::io_service 多线程优先级队列处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51991388/

相关文章:

c++ - 在 C++ 中处理字节顺序

c++ - 使用递归 C++ 将字符串转换为整数

multithreading - 当有一个读者和一个作者线程时,我是否需要使用 std::atomic_

c++ - boost-asio 编译失败(C++ 类中的初始化列表)?

c++ - 前/后增量的左值和右值

c++ - 静态数据类型(结构)的组织

java - 为什么 lockStatic 在 android 中是易变的?

c# - 拥有资源的生产者-消费者

c++ - Boost asio socket : how to get IP,连接端口地址?

c++ - boost::asio::async_read 在换行符上返回文件结尾错误