我在尝试调试基于 Linux-futex 和原子操作的锁定原语中导致死锁的竞争条件时遇到了可怕的时间。这是我正在使用的代码(与真实代码完全相同的逻辑,只是去掉了对与问题无关的数据结构的依赖):
int readers, writer, waiting;
void wrlock()
{
int cur;
while (atomic_swap(&writer, 1)) spin();
while ((cur=readers)) futex_wait(&readers, cur);
}
void wrunlock()
{
atomic_store(&writer, 0);
if (waiting) futex_wake(&writer, ALL);
}
void rdlock()
{
atomic_inc(&waiting);
for (;;) {
atomic_inc(&readers);
if (!writer) return;
atomic_dec(&readers);
futex_wait(&writer, 1);
}
}
void rdunlock()
{
atomic_dec(&waiting);
atomic_dec(&readers);
if (writer) futex_wake(&readers, ALL);
}
atomic_*
和 spin
函数非常明显。 Linux futex 操作为 futex_wait(int *mem, int val)
和 futex_wake(int *mem, int how_many_to_wake)
。
我遇到的死锁情况是 3 个线程,readers==0
、writer==1
、waiting==2
,以及等待 futex_wait
的所有线程。我不明白这是怎么发生的。
对于那些想因为我没有使用 pthread 原语而对我大喊大叫的人,请把它留到另一个问题。这是低级代码,其运行不依赖于 glibc/libpthread。无论如何,我认为这个问题可能对其他人了解低级并发黑魔法有用,或者可能吓唬其他人不要尝试编写这样的代码......;-)
顺便说一句,硬件是 x86,所以即使代码存在内存排序问题,我也不认为它们会表现为错误。我猜想我错过了对 futexes 的微妙误用,特别是因为当所有等待都被虚拟为旋转时代码工作正常。
这是为wrlock
生成的asm(基本上与我发布的版本相同,除了它为第一个自旋锁调用单独的函数lock
)。开头的附加条件返回是“如果我们没有运行多个线程,则退出”。 0x338
和0x33c
对应于readers
和writer
。 call 1af
实际上是调用外部 futex_wait
的重定位。
00000184 <wrlock>:
184: a1 18 00 00 00 mov 0x18,%eax
189: 55 push %ebp
18a: 85 c0 test %eax,%eax
18c: 89 e5 mov %esp,%ebp
18e: 75 02 jne 192 <wrlock+0xe>
190: c9 leave
191: c3 ret
192: 68 3c 03 00 00 push $0x33c
197: e8 7e fe ff ff call 1a <lock>
19c: 58 pop %eax
19d: a1 38 03 00 00 mov 0x338,%eax
1a2: 85 c0 test %eax,%eax
1a4: 74 ea je 190 <wrlock+0xc>
1a6: 6a 01 push $0x1
1a8: 50 push %eax
1a9: 68 38 03 00 00 push $0x338
1ae: e8 fc ff ff ff call 1af <wrlock+0x2b>
1b3: a1 38 03 00 00 mov 0x338,%eax
1b8: 83 c4 0c add $0xc,%esp
1bb: 85 c0 test %eax,%eax
1bd: 75 e7 jne 1a6 <wrlock+0x22>
1bf: eb cf jmp 190 <wrlock+0xc>
最佳答案
我认为这说明了你潜在的僵局。假设单个处理器按以下顺序执行 3 个线程:
// to start,
// readers == 0, writer == 0, waiting == 0
Reader1
===================================
// in rdlock()
atomic_inc(&waiting);
for (;;) {
atomic_inc(&readers);
// if (!writer) has not been executed yet
// readers == 1, writer == 0, waiting == 1
writer
===================================
// in wrlock()
while (atomic_swap(&writer, 1)) spin();
while ((cur=readers)) futex_wait(&readers, cur)
// writer thread is waiting
// readers == 1, writer == 1, waiting == 1
// cur == 1
Reader1
===================================
// back to where it was in rdlock()
if (!writer) return;
atomic_dec(&readers);
futex_wait(&writer, 1);
// Reader1 is waiting
// readers == 0, writer == 1, waiting == 1
Reader2
===================================
// in rdlock()
atomic_inc(&waiting);
for (;;) {
atomic_inc(&readers);
if (!writer) return;
atomic_dec(&readers);
futex_wait(&writer, 1);
// Reader2 is waiting
// readers == 0, writer == 1, waiting == 2
现在你陷入了僵局。
当然,我可能不明白 futex API 是如何工作的(我从未使用过它们),所以如果我在这里偏离基础,请告诉我。我假设 futex_wait()
该阻塞(因为预期值是正确的)在出现 futex_wake()
之前不会解除阻塞。调用该地址。
如果atomic_xxx()
操作可以解锁futex_wait()
,这个分析是错误的。
最后,如果这就是你遇到的情况,我还没有机会考虑可能的解决方案......
关于c - 使用原子和 futexes 锁定代码时无法找到竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4357183/