c++ - 如何判断我的线程池何时完成其任务?

标签 c++ multithreading c++11 threadpool

在 c++11 中,我有一个 ThreadPool 对象,它管理许多通过单个 lambda 函数排队的线程。我知道我必须处理多少行数据,所以我提前知道我需要排队 N 个作业。我不确定的是如何判断所有这些工作何时完成,以便我可以继续下一步。

这是管理线程池的代码:

#include <cstdlib>
#include <vector>
#include <deque>
#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>

class ThreadPool;

class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F>
    void enqueue(F f);
    ~ThreadPool();
    void joinAll();
    int taskSize();

private:
    friend class Worker;

    // the task queue
    std::deque< std::function<void()> > tasks;

    // keep track of threads
    std::vector< std::thread > workers;

    // sync
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

void Worker::operator()()
{
    std::function<void()> task;
    while(true)
    {
        {   // acquire lock
            std::unique_lock<std::mutex> 
                lock(pool.queue_mutex);

            // look for a work item
            while ( !pool.stop && pool.tasks.empty() ) {
                // if there are none wait for notification
                pool.condition.wait(lock);
            }

            if ( pool.stop )  {// exit if the pool is stopped
                return;
            }

            // get the task from the queue
            task = pool.tasks.front();
            pool.tasks.pop_front();

        }   // release lock

        // execute the task
        task();
    }
}


// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for (size_t i = 0;i<threads;++i) {
        workers.push_back(std::thread(Worker(*this)));
    }

    //workers.
    //tasks.
}

// the destructor joins all threads
ThreadPool::~ThreadPool()
{
    // stop all threads
    stop = true;
    condition.notify_all();

    // join them
    for ( size_t i = 0;i<workers.size();++i) {
        workers[i].join();
    }
}

void ThreadPool::joinAll() {
    // join them
    for ( size_t i = 0;i<workers.size();++i) {
        workers[i].join();
    }
}

int ThreadPool::taskSize() {
    return tasks.size();
}

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
    { // acquire lock
        std::unique_lock<std::mutex> lock(queue_mutex);

        // add the task
        tasks.push_back(std::function<void()>(f));
    } // release lock

    // wake up one thread
    condition.notify_one();
}

然后我将我的工作分配给这样的线程:

ThreadPool pool(4);
/* ... */
for (int y=0;y<N;y++) {
    pool->enqueue([this,y] {
        this->ProcessRow(y);
    });
}

// wait until all threads are finished
std::this_thread::sleep_for( std::chrono::milliseconds(100) );

等待 100 毫秒之所以可行,是因为我知道这些作业可以在不到 100 毫秒的时间内完成,但显然这不是最佳方法。一旦它完成了 N 行处理,它就需要再经历 1000 代左右的相同事情。显然,我想尽快开始下一代。

我知道必须有一些方法可以将代码添加到我的线程池中,这样我就可以做这样的事情:

while ( pool->isBusy() ) {
    std::this_thread::sleep_for( std::chrono::milliseconds(1) );
}

我已经为此工作了好几个晚上,但我发现很难找到如何做到这一点的好例子。 那么,我的 isBusy() 方法的正确实现方式是什么?

最佳答案

我明白了!

首先,我给ThreadPool类引入了几个额外的成员:

class ThreadPool {
    /* ... exisitng code ... */
    /* plus the following */
    std::atomic<int> njobs_pending;
    std::mutex main_mutex;
    std::condition_variable main_condition;
}

现在,我可以做得比每隔 X 时间检查一些状态更好。现在,我可以阻塞主循环,直到没有更多的作业挂起:

void ThreadPool::waitUntilCompleted(unsigned n) {
    std::unique_lock<std::mutex> lock(main_mutex);
    main_condition.wait(lock);
}

只要我在 ThreadPool.enqueue() 函数的头部使用以下簿记代码管理未决的内容:

njobs_pending++;

在我运行 Worker::operator()() 函数中的任务之后:

if ( --pool.njobs_pending == 0 ) {
    pool.main_condition.notify_one();
}

然后主线程可以将任何必要的任务加入队列,然后等待所有计算完成:

for (int y=0;y<N;y++) {
    pool->enqueue([this,y] {
        this->ProcessRow(y);
    });
}
pool->waitUntilCompleted();

关于c++ - 如何判断我的线程池何时完成其任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33661395/

相关文章:

c++ - 显式引用限定转换运算符模板的实际应用

c++ - int64_t 的整数类型歧义

c++ - 不需要大量库的体面的 shared_ptr 实现?

c++ - 关于c++内存管理的问题

java - 由 : java. lang.OutOfMemoryError: Java heap space 引起

c++ - 强制C++类具有对齐的属性

c++ - 在C++中获取数字的所有组合

c# - 跨线程异常 - 仅限环境

multithreading - Puma 中的 Workers 和 Threads 有什么区别

使用 decltype 和 constness 的 C++11 尾随返回成员函数