我为需要跨线程同步的非常简单的数据编写了一个容器。我想要顶级性能。我不想使用锁。
我想使用“宽松”原子。部分是为了那一点额外的魅力,部分是为了真正理解它们。
我在这方面做了很多工作,我正处于这段代码通过我对其进行的所有测试的地步。但这还不是完全“证据”,所以我想知道我是否遗漏了什么,或者我可以通过其他任何方式对此进行测试吗?
这是我的前提:
- 唯一重要的是节点被正确地压入和弹出,并且堆栈永远不会失效。
- 我相信内存中的操作顺序只在一个地方很重要:
- 在 compare_exchange 操作本身之间。 这是有保证的,即使是宽松的原子。
- “ABA”问题通过为指针添加标识号来解决。在 32 位系统上,这需要一个双字 compare_exchange,而在 64 位系统上,指针未使用的 16 位由 ID 号填充。
- 因此:堆栈将始终处于有效状态。 (对吗?)
这是我的想法。 “通常”,我们对正在阅读的代码进行推理的方式是查看代码的编写顺序。可以“乱序”读取或写入内存,但不能以破坏程序正确性的方式进行。
这在多线程环境中发生了变化。这就是内存栅栏的作用 - 这样我们仍然可以查看代码并能够推断它是如何工作的。
因此,如果这里的一切都变得乱七八糟,我要用松散的原子做什么?是不是有点太过分了?
我不这么认为,但这就是我在这里寻求帮助的原因。
compare_exchange 操作本身可以保证彼此之间的顺序恒定。
唯一一次读取或写入原子是在 compare_exchange 之前获取 head 的初始值。它被设置为变量初始化的一部分。据我所知,此操作是否带回“正确”值无关紧要。
当前代码:
struct node
{
node *n_;
#if PROCESSOR_BITS == 64
inline constexpr node() : n_{ nullptr } { }
inline constexpr node(node* n) : n_{ n } { }
inline void tag(const stack_tag_t t) { reinterpret_cast<stack_tag_t*>(this)[3] = t; }
inline stack_tag_t read_tag() { return reinterpret_cast<stack_tag_t*>(this)[3]; }
inline void clear_pointer() { tag(0); }
#elif PROCESSOR_BITS == 32
stack_tag_t t_;
inline constexpr node() : n_{ nullptr }, t_{ 0 } { }
inline constexpr node(node* n) : n_{ n }, t_{ 0 } { }
inline void tag(const stack_tag_t t) { t_ = t; }
inline stack_tag_t read_tag() { return t_; }
inline void clear_pointer() { }
#endif
inline void set(node* n, const stack_tag_t t) { n_ = n; tag(t); }
};
using std::memory_order_relaxed;
class stack
{
public:
constexpr stack() : head_{}{}
void push(node* n)
{
node next{n}, head{head_.load(memory_order_relaxed)};
do
{
n->n_ = head.n_;
next.tag(head.read_tag() + 1);
} while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));
}
bool pop(node*& n)
{
node clean, next, head{head_.load(memory_order_relaxed)};
do
{
clean.set(head.n_, 0);
if (!clean.n_)
return false;
next.set(clean.n_->n_, head.read_tag() + 1);
} while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));
n = clean.n_;
return true;
}
protected:
std::atomic<node> head_;
};
这个问题与其他问题相比有何不同?宽松的原子。它们对问题有很大影响。
那么,你怎么看?有什么我想念的吗?
最佳答案
push
已损坏,因为您在 compareAndSwap
失败后未更新 node->_next
。当下一次 compareAndSwap
尝试成功时,您最初使用 node->setNext
存储的节点可能已被另一个线程从堆栈顶部弹出。结果,某个线程认为它已从堆栈中弹出一个节点,但该线程已将其放回 堆栈中。应该是:
void push(Node* node) noexcept
{
Node* n = _head.next();
do {
node->setNext(n);
} while (!_head.compareAndSwap(n, node));
}
此外,由于 next
和 setNext
使用 memory_order_relaxed
,因此不能保证 _head_.next()
这里正在返回最近推送的节点。有可能从堆栈顶部泄漏节点。 pop
显然也存在同样的问题:_head.next()
可能会返回一个之前在栈顶但已经不在栈顶的节点。如果返回值为nullptr
,当栈实际不为空时可能弹出失败。
pop
也可能有未定义的行为。他们都看到相同的 _head.next()
值,一个线程成功完成 pop。另一个线程进入 while 循环 - 因为观察到的节点指针不是 nullptr
- 但是 compareAndSwap
循环很快将它更新为 nullptr
因为堆栈现在是空的。在循环的下一次迭代中,该 nullptr 被取消引用以获取其 _next
指针,随之而来的是欢闹。
pop
显然也患有 ABA。两个线程可以看到栈顶的同一个节点。假设一个线程到达计算 _next
指针的位置然后阻塞。另一个线程成功弹出节点,推送 5 个新节点,然后在另一个线程唤醒之前再次推送原始节点。其他线程的 compareAndSwap
将成功 - 栈顶节点是相同的 - 但将旧的 _next
值存储到 _head
而不是新的一个。另一个线程推送的五个节点全部泄露。 memory_order_seq_cst
也是这种情况。
关于c++ - 无锁堆栈 - 这是 c++11 宽松原子的正确用法吗?可以证明吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24831200/