c++ - std::vector 的并行写入器和读取器

标签 c++ multithreading concurrency vector c++11

我有一个类同时被 2 个线程使用:一个线程将结果(一个接一个)添加到任务的 results 中,第二个线程处理那些 已经存在的结果

// all members are copy-able
struct task {
    command cmd;
    vector<result> results;
};

class generator {
    public:
        generator(executor* e); // store the ptr
        void run();
        ...
};

class executor {
    public:
        void run();
        void add_result(int command_id, result r);
        task& find_task(int command_id);
            ...
    private:
        vector<task> tasks_;
        condition_variable_any update_condition_;
};

启动

// In main, we have instances of generator and executor,
// we launch 2 threads and wait for them.
std::thread gen_th( std::bind( &generator::run, gen_instance_) );
std::thread exe_th( std::bind( &executor::run,  exe_instance_) );

生成器线程

void generator::run() {
    while(is_running) {
        sleep_for_random_seconds(); 
        executor_->add_result( SOME_ID, new_result() );
    }
}

执行线程

void executor::add_result( int command_id, result r ) {
    std::unique_lock<std::recursive_mutex> l(mutex_);
    task& t = this->find_task(command_id);
    t.results.push_back(r);
    update_condition_.notify_all();
}

void executor::run() { 
  while(is_running) {
     update_condition_.wait(...);
     task& t = this->find_task(SOME_ID);        
     for(result r: t.results) {
        // no live updates are visible here
     }
   }
}
  1. 生成器线程每隔几秒添加一个结果。
  2. 执行器线程本身就是一个执行器。它通过 run 方法运行,该方法等待更新,当更新发生时,它会处理结果。

注意事项:

  1. 任务 vector 可能很大;结果永远不会被丢弃;
  2. 执行器中的for-each 循环获取它正在处理的任务,然后遍历结果,检查其中哪些是新的并处理它们。一旦处理,它们将被标记并且不会被再次处理。此处理可能需要一些时间。

Executor Thread 在添加另一个结果之前没有完成 for 循环时会出现问题 - 结果对象在 for 循环中不可见。由于 Executor Thread 正在工作,它不会注意到更新条件更新,不会刷新 vector 等。当它完成时(在alread-not-actual view 上工作 tasks_) 它再次卡在 update_condition_.. 刚刚被触发。

我需要让代码知道,它应该在完成循环后再次运行循环for-each循环中可见的任务进行更改。这个问题的最佳解决方案是什么?

最佳答案

您只需要检查您的 vector 是否为空,阻塞 CV 之前。类似的东西:

while (running) {
    std::unique_lock<std::mutex> lock(mutex);
    while (tasks_.empty()) // <-- this is important
        update_condition_.wait(lock);
    // handle tasks_
}

如果您的体系结构允许(即,如果您在处理任务时不需要持有锁),您可能还希望在处理任务之前尽快解锁互斥量,以便生产者可以推送更多任务而无需阻塞。也许用一个临时 vector 交换你的 tasks_ vector ,然后解锁互斥量,然后才开始处理临时 vector 中的任务:

while (running) {
    std::unique_lock<std::mutex> lock(mutex);
    while (tasks_.empty())
        update_condition_.wait(lock);
    std::vector<task> localTasks;
    localTasks.swap(tasks_);
    lock.unlock(); // <-- release the lock early
    // handle localTasks
}

编辑:啊,现在我意识到这并不适合你的情况,因为你的消息不直接在 tasks_ 中,而是在 tasks_.results。虽然您了解我的总体思路,但使用它需要更改代码的结构(例如,扁平化您的任务/结果并始终有一个与单个结果关联的 cmd)。

关于c++ - std::vector 的并行写入器和读取器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15460403/

相关文章:

Java FX 如何 'Stop'线程中途并重新启动它或销毁它

multithreading - 子进程和线程的区别

java - 无法创建具有大小限制的缓存线程池?

java - 从文件系统读取文件时的并发性

C++编译错误: request for member ‘c_cflag' in something not a structure or union

c++ - Rcpp - sourceCpp - undefined symbol

c++ - 如何使用子字符串找到数字行的正确部分?

c++ - 关于我的第一个使用 Boost 库的程序的问题(异常(exception),长路径)

python - SQLAlchemy ORM : safely passing objects between threads without manually reattaching?

wpf - 如何进入VS中的线程 View