c++ - 在 unique_lock 上导致 std::system_error 的生产者-消费者场景

标签 c++ multithreading locking producer-consumer

以下程序应创建一个 JobScheduler,将 Jobwaiting_queue 放入 running_queue。当 running_queue 不为空时,如果 Job 完成,则有 2 个或更多线程执行 Job 并在指定的时间内休眠(duration == execution_time) 然后放入completed_queue 否则再次放入running_queue,直到running_queuewaiting_queue 为空。

runningLock.unlock() 调用会在某个时候导致

在抛出“std::system_error”实例后调用终止

runningLock 不是一次只能从一个线程获取吗?看起来 .unlock() 已被调用,即使线程没有获得锁也是如此。

main.cpp

#include <string>
#include <iostream>
#include <sstream>
#include <vector>
#include <queue>
#include <thread>
#include <future>
// #include "JobScheduler.h"

class Job {

private:
    int id;
    int duration;
    int execution_time;
    int start_time;
    int wait_time;
    float completion_time;

public:
    Job();

    Job(int id, int startTime, int duration);

    Job(const Job &);

    Job& operator=(const Job &);

    ~Job();

    int getId() const;

    void setId(int id);

    int getDuration() const;

    void setDuration(int duration);

    int getExecutionTime() const;

    void setExecutionTime(int executionTime);

    int getStartTime() const;

    void setStartTime(int startTime);

    int getWaitTime() const;

    void setWaitTime(int waitTime);

    int getCompletionTime() const;

    void setCompletionTime(int completionTime);

    bool operator< (const Job&) const;
};

int Job::getId() const {
    return id;
}

void Job::setId(int id) {
    Job::id = id;
}

int Job::getDuration() const {
    return duration;
}

void Job::setDuration(int duration) {
    Job::duration = duration;
}

int Job::getExecutionTime() const {
    return execution_time;
}

void Job::setExecutionTime(int executionTime) {
    execution_time = executionTime;
}

int Job::getStartTime() const {
    return start_time;
}

void Job::setStartTime(int startTime) {
    start_time = startTime;
}

int Job::getWaitTime() const {
    return wait_time;
}

void Job::setWaitTime(int waitTime) {
    wait_time = waitTime;
}

int Job::getCompletionTime() const {
    return completion_time;
}

void Job::setCompletionTime(int completionTime) {
    completion_time = completionTime;
}

Job::Job() : id(0), start_time(0), duration(0) {}

Job::Job(int id, int startTime, int duration) : id(id), start_time(startTime), duration(duration), execution_time(0), wait_time(0), completion_time(0) {}

Job::Job(const Job &job) {
    id = job.id;
    duration = job.duration;
    execution_time = job.execution_time;
    start_time = job.start_time;
    wait_time = job.wait_time;
    completion_time = job.completion_time;
}

Job &Job::operator=(const Job &job) {
    if (this != &job) {
        id = job.id;
        duration = job.duration;
        execution_time = job.execution_time;
        start_time = job.start_time;
        wait_time = job.wait_time;
        completion_time = job.completion_time;
    }

    return *this;
}

bool Job::operator<(const Job &job) const {
    return start_time > job.start_time;
}

Job::~Job() {}

class JobScheduler {
private:
    const int quantum; // ms

    std::vector<std::thread> threadPool;

    std::priority_queue<Job> waitingJobsQueue;

    std::queue<Job> runningJobsQueue;
    std::mutex r_mutex;

    std::vector<Job> completedJobsQueue;
    std::mutex c_mutex;

    std::condition_variable is_empty;
    std::condition_variable is_completed;

    bool done;
public:
    JobScheduler();

    void submit(Job j);
    void start();

    ~JobScheduler();
};

JobScheduler::JobScheduler() : quantum(3000), waitingJobsQueue(), runningJobsQueue(), completedJobsQueue(), done(false) {}

void JobScheduler::submit(Job j) {
    waitingJobsQueue.push(j);
}

void JobScheduler::start() {

    auto startTime = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now());

    for (int i = 0; i < std::thread::hardware_concurrency(); i++) {
        threadPool.emplace_back([this, startTime](){
            std::stringstream msg;
            msg << "thread " << std::this_thread::get_id() << " created" << std::endl;
            std::cout << msg.str();

            std::unique_lock runningLock(r_mutex);
            while (!done) {
                auto woke_up_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now());
                is_empty.wait(runningLock, [this]{return (!runningJobsQueue.empty() || waitingJobsQueue.empty());});
                auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - woke_up_time);
                int waiting_time = elapsed_time.count();

                if (!runningJobsQueue.empty()) {
                    Job job = runningJobsQueue.front();
                    runningJobsQueue.pop();
                    runningLock.unlock();

                    job.setWaitTime(job.getWaitTime() + waiting_time);

                    float sleep_time = std::min(job.getDuration() - job.getExecutionTime(), quantum);
                    job.setExecutionTime(job.getExecutionTime() + sleep_time);
                    std::stringstream msg1;
                    msg1 << "[" << std::this_thread::get_id() << "] running job " << job.getId() << " at t = " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count() << std::endl;
                    std::cout << msg1.str();
                    std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<long>(sleep_time)));

                    if (job.getExecutionTime() >= job.getDuration()) {
                        auto completion_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime);
                        job.setCompletionTime(completion_time.count());
                        std::lock_guard completedLock(c_mutex);
                        completedJobsQueue.push_back(job);
                        std::stringstream msg2;
                        msg2 << "[" << std::this_thread::get_id() << "] completed " << job.getId() << " at t = " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count() << std::endl;
                        std::cout << msg2.str();
                    } else {
                        runningLock.lock();
                        runningJobsQueue.push(job);
                        std::stringstream msg2;
                        msg2 << "[" << std::this_thread::get_id() << "] pushed " << job.getId() << " at t = " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count() << std::endl;
                        std::cout << msg2.str();
                        is_empty.notify_one();
                    }

                } else {
                    runningLock.lock();
                    done = true;
                    is_empty.notify_all();
                }


            }

            std::stringstream msg3;
            msg3 << "[" << std::this_thread::get_id() << "] finished at t = " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count() << std::endl;
            std::cout << msg3.str();


        });
    }

    while (!waitingJobsQueue.empty()) {

        Job job = waitingJobsQueue.top();
        float start_time = std::chrono::milliseconds(job.getStartTime()).count();
        std::stringstream msg;
        msg << "[JobScheduler] sleeping " << start_time << " ms to start job " << job.getId() << std::endl;
        std::cout << msg.str();


        auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime);


        if (elapsed_time.count() < job.getStartTime()) {
            std::this_thread::sleep_for(std::chrono::milliseconds(job.getStartTime()-elapsed_time.count()));
        }


        {
            std::lock_guard lockGuard(r_mutex);
            waitingJobsQueue.pop();
            runningJobsQueue.push(job);
            std::stringstream msg;
            msg << "[JobScheduler] job " << job.getId() << " pushed into the running queue" << std::endl;
            std::cout << msg.str();
            is_empty.notify_all();
        }

    }
}

JobScheduler::~JobScheduler() {
    for (int i = 0; i < std::thread::hardware_concurrency(); i++) {
        if (threadPool[i].joinable()) {
            threadPool[i].join();
        }
    }

    float completed_queue_size = completedJobsQueue.size();
    float max_compl_time = 0;
    float tot_exec_time = 0;
    float tot_turnaround_time = 0;
    float tot_waiting_time = 0;
    for (auto &j: completedJobsQueue) {
        j = completedJobsQueue.back();
        completedJobsQueue.pop_back();
        max_compl_time = std::max(static_cast<float>(j.getCompletionTime()), max_compl_time);
        tot_exec_time += j.getExecutionTime();
        tot_waiting_time += j.getWaitTime();
        tot_turnaround_time += j.getCompletionTime() - j.getStartTime();
    }

    float avg_turnaround_time = (tot_turnaround_time/completed_queue_size)/1000;
    float avg_waiting_time = (tot_waiting_time/completed_queue_size)/1000;
    float exec_time = tot_exec_time/(1000*completed_queue_size);
    float compl_time = max_compl_time/1000;
    float cpu_usage = (exec_time/compl_time)*100;

    std::cout.precision(3);
    std::cout << "avg turnaround time " << avg_turnaround_time << "s" << std::endl;
    std::cout << "avg waiting time " << avg_waiting_time << "s" << std::endl;
    std::cout << "exec time " << exec_time << "s" << std::endl;
    std::cout << "compl time " << compl_time << "s" << std::endl;
    std::cout << "cpu usage " << cpu_usage << "%" << std::endl;
}

int main() {
    JobScheduler p{};
    p.submit(Job(1, 0, 15000));
    p.submit(Job(2, 0, 6000));
    p.submit(Job(3, 1000, 9000));
    p.submit(Job(4, 2000, 12000));
    p.submit(Job(5, 3000, 16000));
    p.submit(Job(6, 3000, 5000));
    p.submit(Job(7, 4000, 7000));
    p.submit(Job(8, 4000, 6000));
    p.submit(Job(9, 5000, 9000));

    p.start();
}

更新:我无法减少上面的代码。这是我的完整输出

thread 4 created
thread 3 created
thread 2 created
[JobScheduler] sleeping 0 ms to start job 1
[JobScheduler] job 1 pushed into the running queue
thread 5 created
[JobScheduler] sleeping 0 ms to start job 2
[JobScheduler] job 2 pushed into the running queue
[2] running job 1 at t = 1
[JobScheduler] sleeping 1000 ms to start job 3
[5] running job 2 at t = 2
[JobScheduler] job 3 pushed into the running queue
[JobScheduler] sleeping 2000 ms to start job 4
[4] running job 3 at t = 1000
[JobScheduler] job 4 pushed into the running queue
[3] running job 4 at t = 2001
[JobScheduler] sleeping 3000 ms to start job 6
[JobScheduler] job 6 pushed into the running queue
[JobScheduler] sleeping 3000 ms to start job 5
[JobScheduler] job 5 pushed into the running queue
[JobScheduler] sleeping 4000 ms to start job 8
[2] pushed 1 at t = 3003
[2] running job 6 at t = 3003
[5] pushed 2 at t = 3004
[5] running job 5 at t = 3004
[JobScheduler] job 8 pushed into the running queue
[JobScheduler] sleeping 4000 ms to start job 7
[JobScheduler] job 7 pushed into the running queue
[JobScheduler] sleeping 5000 ms to start job 9
[4] pushed 3 at t = 4001
[4] running job 1 at t = 4001
[JobScheduler] job 9 pushed into the running queue
[3] pushed 4 at t = 5002
[3] running job 2 at t = 5002
[2] pushed 6 at t = 6004
[2] running job 8 at t = 6004
[5] pushed 5 at t = 6005
[5] running job 7 at t = 6005
[4] pushed 1 at t = 7002
[4] running job 3 at t = 7002
[3] completed 2 at t = 8002
terminate called after throwing an instance of 'std::system_error'
  what():  Operation not permitted
[5] pushed 7 at t = 9366
[5] running job 4 at t = 9366
[2] pushed 8 at t = 9366
[2] running job 6 at t = 9366

Process finished with exit code 3

最佳答案

通过查看代码,而不是测试它,我可以指出逻辑中的以下错误:

您在循环 while (!done) 内调用 runningLock.unlock() 并在条件 if (!runningJobsQueue.empty()),但 done 只会在后一种情况的 else 情况下设置为 true。所以我认为你试图多次解锁同一个互斥锁,这会导致错误:

If there is no associated mutex or the mutex is not locked, std::system_error with an error code of std::errc::operation_not_permitted

https://en.cppreference.com/w/cpp/thread/unique_lock/unlock

关于c++ - 在 unique_lock 上导致 std::system_error 的生产者-消费者场景,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57146540/

相关文章:

c++ - 使用 http 调用连接 C++ 应用程序?

c++ - 如何在不同流程/系统中的 Rhapsody 模型之间进行通信?

c++ - boost 的 dijkstra_shortest_paths 中的负边权重检查

c - MPI RMA 操作 : Ordering between MPI_Win_free and local load

locking - 实现写优先 R/W 锁

c++ - 在迭代同一 vector 时删除 vector 中的元素

java - 用于线程安全的 volatile 关键字

java - Spring 线程中断

c# - 在异步方法 .net 4.5 中管理同步调用

java - 当第一个线程锁定类时,第二个线程会发生什么