c++ - C++中的无锁多生产者多消费者

标签 c++ buffer producer-consumer

我必须用 C++ 编写一个多生产者-消费者系统,但我无法尝试将模型的每个部分(具有正确缓冲区的线程)组合在一起。该模型的基本功能是:我有一个执行函数的初始线程。这个返回的结果需要放在数量不确定的缓冲区中,因为函数处理的每个元素都是不同的,需要在单个线程中处理。然后,随着缓冲区中存储的数据,另一个n线程需要获取这个缓冲区的数据来执行另一个功能,而这个的返回需要再次放入一些缓冲区。

目前我已经创建了这个缓冲区结构:

template <typename T>
class buffer {
public:
  atomic_buffer(int n);
  int bufSize() const noexcept;
  bool bufEmpty() const noexcept;
  bool full() const noexcept;
  ~atomic_buffer() = default;


  void put(const T & x, bool last) noexcept;
  std::pair<bool,T> get() noexcept;

private:
  int next_pos(int p) const noexcept;

private:
  struct item {
    bool last;
    T value;
  };
  const int size_;
  std::unique_ptr<item[]> buf_;
  alignas(64) std::atomic<int> nextRd_ {0};
  alignas(64) std::atomic<int> nextWrt_ {0};
};

我还创建了一个 vector 结构,用于存储非缓冲区集合,以满足不确定数量的线程需求。

std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;

for(int i=0; i<n; i++){     
v1.push_back(std::unique_ptr<locked_buffer<std::pair<int,std::vector<std::vector<unsigned char>>>>> (new locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>(aux)));  
}

编辑:

Flowchart of the model

最佳答案

在不知道更多上下文的情况下,这看起来像是一个标准线程池的应用程序。您有不同的任务排入同步队列(例如 buffer 类)。线程池的每个工作线程轮询此队列并每次处理一个任务(例如通过执行 run() 方法)。他们将结果写回另一个同步队列。

每个工作线程都有一对自己的线程局部输入和输出缓冲区。它们不需要同步,因为它们只能从所有者线程本身访问。

enter image description here

编辑: 实际上,我认为这可以简化很多:只需使用一个线程池和一个同步队列。工作线程可以将新任务直接排入队列。绘图中的每个线程都对应一种类型的任务,并实现一个通用的 Task 接口(interface)。 您不需要多个缓冲区。您可以使用多态性并将所有内容放在一个缓冲区中。

编辑 2 - 线程池的解释:
线程池只是一个概念。忘记池方面,使用固定数量的线程。主要思想是:不再有多个具有特定功能的线程,而是有 N 个线程可以处理任何类型的任务。其中 N 是 CPU 的核心数。

你可以改变这个

enter image description here

进入

enter image description here

工作线程执行如下操作。请注意,这是简化的,但您应该明白这一点。

void Thread::run(buffer<Task*>& queue) {
    while(true) {
        Task* task = queue.get();
        if(task)
            task->execute();
        while(queue.isEmpty())
            waitUntilQueueHasElement();
    }
}

并且您的任务实现了一个通用接口(interface),因此您可以将 Task* 指针放入单个队列中:

struct Task {
    virtual void execute() = 0;
}

struct Task1 : public Task {
    virtual void execute() override {
        A();
        B1();
        C();
    }
}

...

另外,帮自己一个忙,使用 typedefs ;)

`std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;`  

成为

typedef std::vector<std::vector<unsigned char>> vector2D_uchar;
typedef std::pair<int, vector2D_uchar> int_vec_pair;
typedef std::unique_ptr<locked_buffer<int_vec_pair>> locked_buffer_ptr;
std::vector<locked_buffer_ptr> v1;

关于c++ - C++中的无锁多生产者多消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41149602/

相关文章:

c++ - QT 5.1.1 - 在 Post 请求中设置 urlQuery 的问题

c++ - 从数组中读取 2x 个字符作为短字符?

c++ - 如何为 C 函数建模 OO 风格的接口(interface)?

c - 如何发送大小大于 64 KB 的 UDP 数据包

python - 如何将数据框信息的输出保存到文件 excel 或文本文件中

python - 生产者多消费者问题,消费者在生产者完成之前完成工作

使用ConcurrentLinkedQueue的Java线程问题

c++ - 在 Win/Linux 上使用 C/C++ 使用大页面分配内存的最简单方法是什么?

linux - 让linux缓冲/dev/random

java - 在这些线程中,这个 boolean 值是做什么用的?