c++ - boost:asio 线程池实现,用于偶尔同步的任务

标签 c++ multithreading gcc boost boost-asio

我有一个“主要”函数,它在每个时间步执行一次许多小的、独立的任务。但是,在每个时间步之后,我必须等待所有任务完成才能继续。

我想让程序多线程化。我尝试过使用 boost-offshoot 线程池实现,我尝试过使用线程的(共享指针) vector ,我尝试过 asio 线程池的想法(使用 io_service,建立一些工作,然后将运行分发到线程并将处理程序发布到 io_service)。

所有这些似乎都有很多开销为我的“许多小任务”创建和销毁线程,我想要一种方法,最好使用 asio 工具,来实例化一个 io_service,一个 thread_group,将处理程序发布到 io_service ,并在发布更多任务之前等待单个时间步的工作完成。有没有好的方法来做到这一点?这是我现在工作的(精简的)代码:

boost::asio::io_service io_service;
for(int theTime = 0; theTime != totalTime; ++theTime)
{
    io_service.reset();
    boost::thread_group threads;
    // scoping to destroy the work object after work is finished being assigned
    {
        boost::asio::io_service::work work(io_service);
        for (int i = 0; i < maxNumThreads; ++i)
        {
            threads.create_thread(boost::bind(&boost::asio::io_service::run,
                &io_service));
        }

        for(int i = 0; i < numSmallTasks; ++i)
        {
            io_service.post(boost::bind(&process_data, i, theTime));
        }
    }
    threads.join_all(); 
}

这是我宁愿拥有的(但不知道如何实现):

boost::asio::io_service io_service;
boost::thread_group threads;
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
    threads.create_thread(boost::bind(&boost::asio::io_service::run,
         &io_service));
}

for(int theTime = 0; theTime != totalTime; ++theTime)
{
    for(int i = 0; i < numSmallTasks; ++i)
    {
        io_service.post(boost::bind(&process_data, i, theTime));
    }
    // wait here until all of these tasks are finished before looping 
    // **** how do I do this? *****
}
// destroy work later and join all threads later...

最佳答案

您可以使用 futures用于数据处理并使用 boost::wait_for_all() 与它们同步.这将允许您根据已完成的部分工作而不是线程进行操作。

int process_data() {...}

// Pending futures
std::vector<boost::unique_future<int>> pending_data;

for(int i = 0; i < numSmallTasks; ++i)
{
   // Create task and corresponding future
   // Using shared ptr and binding operator() trick because
   // packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable

   // Boost 1.51 syntax
   // For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
   typedef boost::packaged_task<int> task_t;

   boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
      boost::bind(&process_data, i, theTime));

   boost::unique_future<int> fut = task->get_future();

   pending_data.push_back(std::move(fut));
   io_service.post(boost::bind(&task_t::operator(), task));    
}

// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end()); 

关于c++ - boost:asio 线程池实现,用于偶尔同步的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13146433/

相关文章:

c++ - 有没有办法在 map 中流式传输?

c++ - 我可以注释多行宏吗?

java - 在netty库中,我想知道如何根据cpu号创建多个线程?

c - 有没有办法在单个翻译单元中理智地使用 GCC __attribute__((noreturn)) 和 <stdnoreturn.h> ?

c++ - 可以为多个构造函数消除 0 (NULL) 的歧义吗?和赋值运算符?

c++ - C++ 中的数组排序问题

c++ - 如何使用 C++ 在 OpenGL_POINTS 函数中绘制正弦波

java - 线程同步和单例问题

C++0x 错误 : variable 'std::packaged_task<int> pt1' has initializer but incomplete type

c++ - 我在哪里可以找到列出 SSE 内在函数操作的官方引用资料?