c++ - 让预定义数量的线程计算更大数量的任务

标签 c++ multithreading c++11

我有预定义数量的任务(任务对象)和预定义数量的线程。

假设有 8 个线程和 32 个任务。

我当前的解决方案允许我并行运行 8 个任务,并且在它们完成后,并行运行另外 8 个任务。

使用此解决方案可能会发生 7 个任务将不得不等待最慢的任务。

我想要一个解决方案,一旦一个线程完成它的任务,它会直接重新运行一个新任务,直到没有剩余。

我当前的实现:

vector <Task> tasks; //the 32 tasks
unsigned int maxThreadCount(8);

for ( unsigned int i = 0; i < tasks.size() - maxThreadCount; i+=maxThreadCount )
{
    vector <thread> threads;
    for (unsigned int j = 0; j < maxThreadCount; ++j)
        threads.push_back (thread(std::bind( &Task::some_function, tasks[i+j])));

    for ( auto &i : threads)
        i.join();
}

最佳答案

这有点乱七八糟,显然是针对您的需求。这个想法是让线程不断拉取数据并将其发送到您的成员函数,直到任务列表用完为止,此时每个线程在该发现时终止。

像这样:

static void ThreadProc(
    std::vector<Task>& tasks,
    std::atomic_uint& idx, 
    void (Task::*member)())
{
    unsigned int i = 0;
    while ((i = idx++) < tasks.size())
        (tasks[i].*member)();
}

void RunTasks(
    std::vector<Task>& tasks, 
    unsigned int n_threads = std::max(std::thread::hardware_concurrency(),1U))
{
    // adjust number of threads to be no larger than the task set
    //  there is no reason to spin up new threads that will have
    //   nothing to do but exit.
    if (tasks.size() < n_threads)
        n_threads = tasks.size();

    // shared index into the task container        
    std::atomic_uint idx{0};

    // spin up the work crew
    std::vector<std::thread> threads;
    threads.reserve(n_threads);
    while (n_threads--)
    {
        threads.emplace_back(ThreadProc, 
                             std::ref(tasks), 
                             std::ref(idx), 
                             &Task::some_function);
    }

    for (auto &t : threads)
        t.join();
}

对于一组固定的任务,这与线程过程调用成员模型所能获得的一样简单。它不需要额外的容器“队列”;任务 vector 就足够了。而且它不需要锁定对象,而是使用更轻的原子。

显然,如果您需要增强的容器访问权限,例如将新项目附加到任务 vector 中间任务等,那么这一切都会消失。类似的事情需要不同的方法,但对于单次工作-crew 方法来完成固定的任务列表,这是很难击败的。根据你的描述,这就是你所需要的。

祝你好运。

关于c++ - 让预定义数量的线程计算更大数量的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25499673/

相关文章:

c++ - 捕获和播放 MJPEG - 使用 OpenCV 和 ffmpeg 通过 UDP 传输网络视频流

c - 多线程消息同步

c++ - 写智能指针和指针的泛型代码时,如何获取简单指针?

c++ - 在scanf中引用int

c# - C# 中的 C++ 非托管 DLL

c++ - 过时函数的标准 COM HRESULT 是什么?

c++ - 与 std::vector<>::size() 并发

java - 无法在尚未调用 Looper.prepare() android 服务的线程内创建处理程序

c++ - 使用 'std::algorithm' 无显式循环迭代自定义对的 vector

c++ - std::future 是否旋转等待?