c++ - 它是 C++11 中互锁单链表的正确实现吗

标签 c++ c++11 atomic race-condition

我有以下使用 C++11 原子的互锁单链表的实现:

struct notag {};

template<class T, class Tag=notag>
struct s_list_base
{
};

template<class T, class Tag = notag>
struct s_list : s_list_base<T, Tag>
{
    s_list_base<T, Tag> *next_ptr;
};

template<bool auto_destruct, class T, class Tag = notag>
class atomic_s_list
{
    struct s_head : s_list_base<T, Tag>
    {
        std::atomic<s_list_base<T, Tag > *> next_ptr { this };
    };
    using LinkType = s_list<T, Tag> *;

    s_head head;

public:
    atomic_s_list() = default;
    atomic_s_list(const atomic_s_list &) = delete;
    atomic_s_list &operator =(const atomic_s_list &) = delete;

    ~atomic_s_list()
    {
        clear();
    }

    void clear() noexcept
    {
        if (auto_destruct)
        {
            T *item;
            do
            {
                item = pop();
                delete item;
            } while (item);
        }
        else
            head.next_ptr = &head;
    }

    void push(T *pItem) noexcept
    {
        auto p = static_cast<LinkType>(pItem);
        auto phead = head.next_ptr.load(std::memory_order_relaxed);
        do
        {
            p->next_ptr = phead;
        } while (!head.next_ptr.compare_exchange_weak(phead, p));
    }

    T *pop() noexcept
    {
        auto result = head.next_ptr.load(std::memory_order_relaxed);
        while (!head.next_ptr.compare_exchange_weak(result, static_cast<LinkType>(result)->next_ptr))
            ;
        return result == &head ? nullptr : static_cast<T *>(result);
    }
};

问题是在实际程序中我有几个并发运行的线程,它们使用 pop 从这个列表中获取一个对象,使用它然后用 push 把它放回去当有时两个线程最终从列表中获取相同的对象时,我似乎有一场比赛。

我试图用那个程序做一个简单的例子来说明一场比赛。 在这里:

struct item : s_list<item>
{
    std::atomic<int> use{ 0 };
};

atomic_s_list<true, item> items;

item *allocate()
{
    auto *result = items.pop();
    if (!result)
        result = new item;
    return result;
}

void free(item *p)
{
    items.push(p);
}

int main()
{
    using namespace std::chrono_literals;
    static const int N = 20;
    std::vector<std::thread> threads;
    threads.reserve(N);

    for (int i = 0; i < N; ++i)
    {
        threads.push_back(std::thread([&]
        {
            while (true)
            {
                auto item = allocate();
                if (0 != item->use.fetch_add(1, std::memory_order_relaxed))
                    std::terminate();

                item->use.fetch_sub(1, std::memory_order_relaxed);
                free(item);
            }
        }));
    }

    std::this_thread::sleep_for(20min);
}

那么问题来了:这个互锁单链表的实现是否正确?

最佳答案

经过更多研究,我可以确认我面对的是 ABA problem .

似乎没有人应该相信现代硬件(具有大量硬件线程)和高度竞争的互锁列表上的这种简单的互锁单链表实现。

在考虑实现维基百科文章中描述的技巧后,我决定使用 boost 实现(参见 boost::lockfree::stack),因为它似乎在解决 ABA 问题上做出了很好的努力。

目前我的测试代码没有失败,原始程序也没有。

关于c++ - 它是 C++11 中互锁单链表的正确实现吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35185859/

相关文章:

c++ - 将子类转换为基类?

c++ - 仅在守护线程返回后删除它

c++ - 带 std::index_sequence 的三元运算符

locking - 比 pthreads 更快的锁定

c++ - 发生在同一线程中的关系之前

c++ - boost::python 是如何工作的?关于实现细节有什么想法吗?

c++ - 为什么我们不能简单地复制 std::function

c++ - unique_ptr 和 OpenSSL 的 STACK_OF(X509)*

c - 下面的程序集是原子的,如果不是,为什么?

c++ - 如何使用 int 数组 [][] 初始化 vector <vector <int>>?