我最近使用三重缓冲区的 std::atomic 做了一个 C++11 的移植,用作并发同步机制。这种线程同步方法背后的想法是,对于生产者 - 消费者的情况,您的生产者运行速度比消费者快,三重缓冲可以带来一些好处,因为生产者线程不会因不得不等待而“变慢”对于消费者。就我而言,我有一个以 ~120fps 更新的物理线程和一个以 ~60fps 运行的渲染线程。显然,我希望渲染线程始终尽可能获得最新状态,但我也知道我将从物理线程中跳过很多帧,因为速率不同。另一方面,我希望我的物理线程保持其恒定的更新速率,而不受锁定数据的较慢渲染线程的限制。
原始 C 代码是由 remis-thoughts 编写的,完整的解释在他的 blog 中。 .我鼓励任何有兴趣阅读它以进一步了解原始实现的人。
可以找到我的实现 here .
基本思想是拥有一个具有 3 个位置(缓冲区)和一个原子标志的数组,该标志是比较和交换的,以定义在任何给定时间哪些数组元素对应于什么状态。这样,只有一个原子变量用于对数组的所有 3 个索引和三重缓冲背后的逻辑进行建模。缓冲区的 3 个位置分别命名为 Dirty、Clean 和 Snap。生产者总是写入 Dirty 索引,并且可以翻转 writer 以将 Dirty 与当前的 Clean 索引交换。消费者可以请求一个新的 Snap,它将当前的 Snap 索引与 Clean 索引交换以获得最新的缓冲区。消费者总是在 Snap 位置读取缓冲区。
该标志由一个 8 位 unsigned int 组成,这些位对应于:
(未使用)(新写入)(2x 脏)(2x 干净)(2x 快照)
newWrite 额外位标志由写入器设置并由读取器清除。读者可以使用它来检查自上次快照以来是否有任何写入,如果没有,则不会再进行一次快照。可以使用简单的按位运算获得标志和索引。
好的现在代码:
template <typename T>
class TripleBuffer
{
public:
TripleBuffer<T>();
TripleBuffer<T>(const T& init);
// non-copyable behavior
TripleBuffer<T>(const TripleBuffer<T>&) = delete;
TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;
T snap() const; // get the current snap to read
void write(const T newT); // write a new value
bool newSnap(); // swap to the latest value, if any
void flipWriter(); // flip writer positions dirty / clean
T readLast(); // wrapper to read the last available element (newSnap + snap)
void update(T newT); // wrapper to update with a new element (write + flipWriter)
private:
bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes
// 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
// newWrite = (flags & 0x40)
// dirtyIndex = (flags & 0x30) >> 4
// cleanIndex = (flags & 0xC) >> 2
// snapIndex = (flags & 0x3)
mutable atomic_uint_fast8_t flags;
T buffer[3];
};
实现:
template <typename T>
TripleBuffer<T>::TripleBuffer(){
T dummy = T();
buffer[0] = dummy;
buffer[1] = dummy;
buffer[2] = dummy;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){
buffer[0] = init;
buffer[1] = init;
buffer[2] = init;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
T TripleBuffer<T>::snap() const{
return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}
template <typename T>
void TripleBuffer<T>::write(const T newT){
buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}
template <typename T>
bool TripleBuffer<T>::newSnap(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do {
if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
return false;
} while(!flags.compare_exchange_weak(flagsNow,
swapSnapWithClean(flagsNow),
memory_order_release,
memory_order_consume));
return true;
}
template <typename T>
void TripleBuffer<T>::flipWriter(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow,
newWriteSwapCleanWithDirty(flagsNow),
memory_order_release,
memory_order_consume));
}
template <typename T>
T TripleBuffer<T>::readLast(){
newSnap(); // get most recent value
return snap(); // return it
}
template <typename T>
void TripleBuffer<T>::update(T newT){
write(newT); // write new value
flipWriter(); // change dirty/clean buffer positions for the next update
}
template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
// check if the newWrite bit is 1
return ((flags & 0x40) != 0);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
// swap snap with clean
return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
// set newWrite bit to 1 and swap clean with dirty
return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}
如您所见,我决定使用 Release-Consume 模式进行内存排序。
存储的 Release (memory_order_release) 确保当前线程中没有写入可以重新排序 后 商店。另一方面,Consume 确保当前线程中没有读取取决于当前加载的值可以重新排序 之前 这个负载。这确保了对释放相同原子变量的其他线程中的因变量的写入在当前线程中可见。
如果我的理解是正确的,因为我只需要原子地设置标志,对不直接影响标志的其他变量的操作可以由编译器自由重新排序,从而允许更多优化。通过阅读一些关于新内存模型的文档,我也意识到这些宽松的原子只会对ARM和POWER等平台产生明显的影响(它们的引入主要是因为它们)。由于我的目标是 ARM,我相信我可以从这些操作中受益,并能够挤出更多的性能。
现在的问题:
对于这个特定问题,我是否正确使用了 Release-Consume 宽松排序?
谢谢,
安德烈
PS:很抱歉这篇很长的帖子,但我认为需要一些体面的上下文才能更好地了解问题。
编辑:
实现@Yakk 的建议:
flags
继续阅读 newSnap()
和 flipWriter()
使用直接赋值,因此使用默认值 load(std::memory_order_seq_cst)
. bool
返回类型到 newSnap()
, 现在在没有新内容时返回 false ,否则返回 true 。 = delete
将类定义为不可复制习惯用法,因为如果 TripleBuffer
,复制构造函数和赋值构造函数都是不安全的正在被使用。 编辑 2:
修正了不正确的描述(感谢@Useless)。请求新 Snap 并从 Snap 索引读取数据的是消费者(而不是“写入者”)。抱歉让您分心,并感谢 Useless 指出这一点。
编辑 3:
优化
newSnap()
和 flipriter()
根据@Display Name 的建议运行,有效去除 2 个冗余 load()
的每个循环周期。
最佳答案
为什么要在 CAS 循环中两次加载旧标志值?第一次是由flags.load()
,第二个来自 compare_exchange_weak()
,标准在 CAS 失败时指定的将加载先前的值到第一个参数中,在本例中为 flagsNow。
根据 http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange , "否则,将存储在 *this 中的实际值加载到预期中(执行加载操作)。"所以你的循环正在做的是失败时,compare_exchange_weak()
重新加载 flagsNow
,然后循环重复,第一条语句再次加载它,紧接在加载后 compare_exchange_weak()
.在我看来,您的循环应该将负载拉到循环外。例如,newSnap()
将是:
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));
和
flipWriter()
:uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));
关于C++11 原子内存排序 - 这是宽松(释放-消费)排序的正确用法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15204578/