我们发现,在我们的代码中有几个地方,对受互斥体保护的数据的并发读取相当普遍,而写入却很少见。我们的测量似乎表明,使用简单的互斥锁会严重影响读取该数据的代码的性能。所以我们需要的是一个多读/单写互斥锁。我知道这可以建立在更简单的原语之上,但在我尝试这样做之前,我宁愿询问现有的知识:
用更简单的同步原语构建多读/单写锁的认可方法是什么?
我确实知道如何制作它,但我更希望得到不受我(可能是错误地)提出的答案的偏见。 (注意:我期望的是如何做到这一点的解释,可能是伪代码,而不是完整的实现。我当然可以自己编写代码。)
注意事项:
这需要有合理的性能。 (我的想法是每次访问都需要两次锁定/解锁操作。现在这可能还不够好,但需要其中很多似乎是不合理的。)
通常,读取次数更多,但写入比读取更重要且对性能更敏感。读者不能饿死作家。
我们被困在一个相当旧的嵌入式平台(VxWorks 5.5 的专有变体)上,有一个相当旧的编译器(GCC 4.1.2)和 boost 1.52——除了 boost 的大部分部分依赖 POSIX,因为POSIX 并未在该平台上完全实现。可用的锁定原语基本上是几种信号量(二进制、计数等),在它们之上我们已经创建了互斥体、条件变量和监视器。
这是 IA32,单核。
最佳答案
乍一看,我以为我认出了this answer与 Alexander Terekhov 介绍的算法相同。但是在研究它之后,我认为它是有缺陷的。两个作者可能同时等待 m_exclusive_cond
.当其中一个写入器唤醒并获得排他锁时,它将设置 exclusive_waiting_blocked = false
在 unlock
,从而将互斥锁设置为不一致的状态。在那之后,互斥体可能会被冲洗掉。
N2406 , 最早提出std::shared_mutex
包含一个部分实现,在下面用更新的语法重复。
class shared_mutex
{
mutex mut_;
condition_variable gate1_;
condition_variable gate2_;
unsigned state_;
static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
static const unsigned n_readers_ = ~write_entered_;
public:
shared_mutex() : state_(0) {}
// Exclusive ownership
void lock();
bool try_lock();
void unlock();
// Shared ownership
void lock_shared();
bool try_lock_shared();
void unlock_shared();
};
// Exclusive ownership
void
shared_mutex::lock()
{
unique_lock<mutex> lk(mut_);
while (state_ & write_entered_)
gate1_.wait(lk);
state_ |= write_entered_;
while (state_ & n_readers_)
gate2_.wait(lk);
}
bool
shared_mutex::try_lock()
{
unique_lock<mutex> lk(mut_, try_to_lock);
if (lk.owns_lock() && state_ == 0)
{
state_ = write_entered_;
return true;
}
return false;
}
void
shared_mutex::unlock()
{
{
lock_guard<mutex> _(mut_);
state_ = 0;
}
gate1_.notify_all();
}
// Shared ownership
void
shared_mutex::lock_shared()
{
unique_lock<mutex> lk(mut_);
while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_)
gate1_.wait(lk);
unsigned num_readers = (state_ & n_readers_) + 1;
state_ &= ~n_readers_;
state_ |= num_readers;
}
bool
shared_mutex::try_lock_shared()
{
unique_lock<mutex> lk(mut_, try_to_lock);
unsigned num_readers = state_ & n_readers_;
if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_)
{
++num_readers;
state_ &= ~n_readers_;
state_ |= num_readers;
return true;
}
return false;
}
void
shared_mutex::unlock_shared()
{
lock_guard<mutex> _(mut_);
unsigned num_readers = (state_ & n_readers_) - 1;
state_ &= ~n_readers_;
state_ |= num_readers;
if (state_ & write_entered_)
{
if (num_readers == 0)
gate2_.notify_one();
}
else
{
if (num_readers == n_readers_ - 1)
gate1_.notify_one();
}
}
该算法源自 Alexander Terekhov 的旧新闻组帖子。它既不会饿死读者也不会饿死作者。
有两个“门”,gate1_
和 gate2_
.读者和作家必须通过gate1_
,并且在尝试这样做时可能会被阻止。一旦读者通过 gate1_
,它已经读锁定了互斥锁。读者可以通过gate1_
只要没有拥有所有权的读者的最大数量,并且只要作者没有超过gate1_
.
一次只有一位作者可以通过 gate1_
.作家可以通过gate1_
即使读者拥有所有权。但是一旦过去 gate1_
,作家仍然没有所有权。它必须先通过gate2_
.作家过不去gate2_
直到所有拥有所有权的读者都放弃它。回想一下,新读者无法通过 gate1_
当作家在 gate2_
等待时.新作家也无法超越gate1_
当作家在 gate2_
等待时.
在gate1_
处读取器和写入器都被阻止的特性为通过它而施加的(几乎)相同的要求,是使该算法对读者和作者都公平的原因,两者都不挨饿。
互斥体“状态”被有意保留在一个单词中,以表明对某些状态更改部分使用原子(作为优化)是可能的(即,对于无争议的“快速路径”)。然而,这里没有展示这种优化。一个例子是如果编写器线程可以自动更改 state_
从 0 到 write_entered
然后他无需阻塞甚至锁定/解锁 mut_
即可获得锁定.和unlock()
可以用原子存储来实现。等等。这些优化在这里没有展示,因为它们比这个简单的描述听起来更难正确实现。
关于c++ - 如何从更基本的同步原语制作多读/单写锁?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27860685/