c++ - 将工作项添加到数组或列表的非阻塞方式

标签 c++ c++11 threadpool future lock-free

编辑:

我现在已经完成了我的队列(克服了下面描述的问题,以及更多)。对于那些感兴趣的人,可以找到here .我很乐意听到任何评论:)。请注意队列不仅仅是一个工作项队列,而是一个模板容器,当然可以用工作项实例化它。

原文:

看完Herb Sutter's talk关于 C++11 和 14 中的并发,我对非阻塞并发感到非常兴奋。

但是,我还没有找到我认为是基本问题的解决方案。所以如果这已经在这里了,请对我客气一点。

我的问题很简单。我正在创建一个非常简单的线程池。为了做到这一点,我在 workPool 类中运行了一些工作线程。我保留了一个 workItems 列表。

如何以无锁方式添加工作项。

这样做的非无锁方式当然是创建一个互斥体。如果您添加项目并在当前工作项目完成后读取(当然锁定)列表,请锁定它。

但是我不知道如何以无锁的方式做到这一点。

下面是我正在创建的内容的粗略概念。我为这个问题写的这段代码。而且它既不完整,也没有错误:)

#include <thread>
#include <deque>
#include <vector>

class workPool
{
public:
    workPool(int workerCount) :
        running(1)
    {
        for (int i = workerCount; i > 0; --i)
            workers.push_back(std::thread(&workPool::doWork, this));
    }

    ~workPool()
    {
        running = 0;
    }
private:
    bool running;
    std::vector< std::thread > workers;
    std::deque< std::function<void()> > workItems;

    void doWork()
    {
        while (running)
        {
            (*workItems.begin())();
            workItems.erase(workItems.begin());
            if (!workItems.size())
                //here the thread should be paused till a new item is added
        }


    }

    void addWorkitem()
    {
        //This is my confusion. How should I do this?
    }

};

最佳答案

我最近看了赫伯的演讲,我相信他的lock-free linked list应该做的很好。唯一的问题是 atomic< shared_ptr<T> >尚未实现。我用过 atomic_* Herb 在他的演讲中也解释了函数调用。

在示例中,我已将任务简化为 int,但它可以是您想要的任何内容。

函数atomic_compare_exchange_weak采用三个参数:要比较的项目、预期值和期望值。它返回 true 或 false 以指示成功或失败。失败时,预期值将更改为找到的值。

#include <memory>
#include <atomic>

// Untested code.

struct WorkItem { // Simple linked list implementation.
    int work;
    shared_ptr<WorkItem> next; // remember to use as atomic
};

class WorkList {
    shared_ptr<WorkItem> head; // remember to use as atomic
public:
    // Used by producers to add work to the list. This implementation adds
    // new items to the front (stack), but it can easily be changed to a queue.
    void push_work(int work) {
        shared_ptr<WorkItem> p(new WorkItem()); // The new item we want to add.
        p->work = work;
        p->next = head;

        // Do we get to change head to p?
        while (!atomic_compare_exchange_weak(&head, &p->next, p)) {
            // Nope, someone got there first, try again with the new p->next,
            // and remember: p->next is automatically changed to the new value of head.
        }
        // Yup, great! Everything's done then.
    }

    // Used by consumers to claim items to process.
    int pop_work() {
        auto p = atomic_load(&head); // The item we want to process.
        int work = (p ? p->work : -1);

        // Do we get to change head to p->next?
        while (p && !atomic_compare_exchange_weak(&head, &p, p->next)) {
            // Nope, someone got there first, try again with the new p,
            // and remember: p is automatically changed to the new value of head.
            work = (p ? p->work : -1); // Make sure to update work as well!
        }
        // Yup, great! Everything's done then, return the new task.
        return work; // Returns -1 if list is empty.
    }
};

编辑: 使用 shared_ptr 的原因结合 atomic_*功能在谈话中解释。简而言之:从链表中弹出一个项目可能会从某个遍历该列表的人的下方删除它,或者可能会在同一内存地址(The ABA Problem)上分配一个不同的节点。使用 shared_ptr将确保所有老读者都持有对原始项目的有效引用。

正如 Herb 所解释的,这使得 pop 函数的实现变得微不足道。

关于c++ - 将工作项添加到数组或列表的非阻塞方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30019413/

相关文章:

c++ - 使用迭代器和索引访问 vector 元素有什么区别?

c++ - 如何解决使用旧版 SetClipboard 将我的 COM 对象放置到剪贴板的问题?

java - 有界队列与 ScheduledThreadPoolExecutor 的最佳方式是什么?

java - 有没有办法取消并重用ExecutorService?

c++ - 初始化列表中的抽象类init

java - 如何检查该池 'reuses' 线程

c++ - 提升 lambda 集合大小评估

c++ - C 字符串和字符数组声明

c++ - 标准线程 sleep_for 不适用于某些 chrono::duration

c++ - 如何仅测试字母的 u32string(使用语言环境)