c++ - 在添加作业期间删除 1 纳秒的延迟时,标准线程作业系统中止

标签 c++ multithreading

我已经包含了该项目的完整源代码,它只有大约 100 行。 这是一个多线程作业系统,我正在为一项学校作业而努力,在为此苦苦挣扎了几天之后,主要是因为这对我来说都是全新的(C++ 11 和标准线程)。

我对线程、互斥锁和条件变量类有基本的了解。但是,应用程序抛出调试错误:R6010 abort() has been called. 当我在将单个作业添加到池期间删除 1 纳秒 sleep 调用时。我希望通过在访问和获取作业期间放置互斥锁来解决错误,但徒劳无功。

#include "stdafx.h"
#include <iostream>
#include <queue>
#include <future>
#include <thread>
#include <atomic>
using namespace std;

float CalcInvSqr(float x){
    return (x);
}

class ThreadPool{
public:
    ThreadPool::ThreadPool(int numThreads){
        for(int i=0;i<numThreads;i++){
            threads.push_back(thread(&ThreadPool::StartThread, this));
        }
    };

    ThreadPool::~ThreadPool(){
        CleanUp();
    }

    template<typename F>
    future<F> AddJob(std::packaged_task<F()>& job) {
        unique_lock<mutex> lock(poolMutex);
        this->jobList.push([&job]() {
            job();
        });

        this->cv.notify_one();
        lock.unlock();
        return job.get_future();
    }

    void StartThread(){
        while(true){

            unique_lock<mutex> uLock(poolMutex);
            cv.wait(uLock);

            if(stopped.load())
                return;

            std::function<void()> f;
            f = jobList.front();
            jobList.pop();

            uLock.unlock();
            f();
        }
    }

    void CleanUp(){
        stopped.store(true);
        {
            unique_lock<mutex> lock(poolMutex);
            cv.notify_all();
            lock.unlock();

            for(int i=0;i<threads.size();i++){
                if(threads[i].joinable())
                    threads[i].join();
            }

            cout<<jobList.size()<<endl;
        }
    }


private:
    mutable std::queue<std::function<void()>> jobList;
    atomic<bool> stopped;
    std::vector<std::thread> threads;
    std::mutex poolMutex;
    std::condition_variable cv;
};

#define JOBS 50

int main(){

    ThreadPool pool(4);

    std::vector<std::future<float>> results;
    for(int i=0;i<JOBS;i++){
        std::this_thread::sleep_for(std::chrono::nanoseconds(1));
        float num = (float)(rand()%RAND_MAX)/(float)RAND_MAX;
        std::packaged_task<float()> task([num](){
            return CalcInvSqr(num);
        });
        results.push_back(pool.AddJob<float>(task));
    }

    for(int i=0;i<JOBS;i++){
        cout<<"VALUE: "<<results[i].get()<<endl;
    }

    cout<<"Jobs Done ! "<<endl;
    system("PAUSE");
    return 0;
}

我在这里遇到了障碍,似乎无法解决错误,我希望在这方面有更多经验的人可以引导我朝着正确的方向前进。

当调用 1 纳秒延迟时,代码成功执行并输出正确。

最佳答案

这是更正后的程序。 请注意,在通知条件变量之前互斥量已解锁。这很重要。

您会注意到我在 ThreadPool::Add<> 中使用了一个 shared_ptr。这是因为代码是为 c++11 编写的。如果它是 c++14,我们可以取消 shared_ptr 并简单地将 packaged_task 移动到 lambda 中,首先获取 future。

#include <iostream>
#include <queue>
#include <future>
#include <thread>
#include <atomic>
using namespace std;

float CalcInvSqr(float x){
    return (x);
}

class ThreadPool{
public:
    ThreadPool(int numThreads){
        for(int i=0;i<numThreads;i++){
            threads.push_back(thread(&ThreadPool::StartThread, this));
        }
    };

    ~ThreadPool(){
        CleanUp();
    }

    template<typename F>
    std::future<F> AddJob(std::packaged_task<F()> job)
    {
        auto job_ptr = make_shared<std::packaged_task<F()>>(move(job));

        unique_lock<mutex> lock(_job_mutex);

        jobList.push_back([job_ptr]() {
            job_ptr->operator()();
        });
        lock.unlock();
        _jobs_available.notify_one();
        return job_ptr->get_future();
    }

    void StartThread(){
        while (true) {
            unique_lock<mutex> job_lock(_job_mutex);
            _jobs_available.wait(job_lock, [this]() { return !stopped && !jobList.empty(); });
            if (stopped)
                return;
            auto job = move(jobList.front());
            jobList.pop_front();
            job_lock.unlock();
            job();
        }
    }

    void CleanUp(){
        unique_lock<mutex> job_lock(_job_mutex);
        stopped = true;
        jobList.clear();
        job_lock.unlock();
        _jobs_available.notify_all();

        for (auto& thread : threads) {
            if (thread.joinable())
                thread.join();
        }
    }


private:
    std::mutex _job_mutex;
    std::condition_variable _jobs_available;

    std::deque<std::function<void()>> jobList;
    atomic<bool> stopped;
    std::vector<std::thread> threads;
};

#define JOBS 5000

int main(){

    ThreadPool pool(40);

    std::vector<std::future<float>> results;
    for(int i=0;i<JOBS;i++){
        float num = (float)(rand()%RAND_MAX)/(float)RAND_MAX;
        std::packaged_task<float()> task([num](){
            return CalcInvSqr(num);
        });
        results.push_back(pool.AddJob<float>(move(task)));
    }

    for(int i=0;i<JOBS;i++){
        cout<<"VALUE: "<<results[i].get()<<endl;
    }

    cout<<"Jobs Done ! "<<endl;
    //    system("PAUSE");
    return 0;
}

关于c++ - 在添加作业期间删除 1 纳秒的延迟时,标准线程作业系统中止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26618633/

相关文章:

c++ - 如何在 WTL 中删除指向无模式对话框的指针

c++ - 如何消除在 switch case 中使用 goto

c++ - 隐藏文件时记录 C++ 宏

ios - Facebook IOS SDK调用同步

c++ - std::endl 和\n 之间的区别(关于平台意识)

c++ - 为什么我们不能在类中声明命名空间别名?

java - 创建一个运行在定时器上但可以随时被唤醒的java线程

multithreading - 如何将对堆栈变量的引用传递给线程?

安全 dao 的 Java 指南/提示

java - 带资源锁的并行测试?