multithreading - 环形缓冲区,1 个写入器和 N 个读取器

标签 multithreading algorithm c++11 design-patterns concurrency

我需要一种环形缓冲区(或一些更合适的数据结构,以防万一)和一种在以下情况下处理环形缓冲区的算法/模式。

1 连续生成实时数据的写入器必须始终能够写入第一个空闲“槽”(一个不在读取过程中的槽),或者等到一个槽空闲用于写入.每当编写器完成将数据写入一个槽时,它就可以为读者“提交”该槽。

在给定时间,可能有N个并发读者。在发出读取请求时,每个读取器都应该始终从环形缓冲区中最后提交的插槽中获取最新写入的数据,但不得多次读取相同的数据。如果自上次读取以来写入者尚未在一个槽中写入和提交新数据,则读取者应该等待(想想快速读取器)。

请注意,一个阅读器不得为另一个阅读器“消费”数据。换句话说,两个不同的读者可能读取相同的数据。同样,一个读者可能会从同一个插槽中读取数据两次或更多次,但前提是作者在两次读取请求之间向该插槽写入数据。

请注意,干扰器可能不适合我的情况(或者我只是没能按我的意愿让它工作)。 disruptor 的问题在于,写入器可能前进得如此之快(与较慢的读取器相比),以至于它最终可能会在读取过程中覆盖某些插槽。在这种情况下,编写器应该能够跳过这些“繁忙”的槽,直到它找到第一个空闲槽(一旦写入,它还必须发布那个槽),但中断器模式不会似乎在考虑这种情况。序列本身还有另一个问题,在我使用的干扰器实现中,它是一个原子整数,因此它可能会溢出,导致一些未定义的行为。

你有什么想法吗?如果您知道现代 C++ 中的解决方案,我将不胜感激。

最佳答案

我不知道这个问题的开箱即用的解决方案(注意:这并不意味着没有!)但是如果我尝试使用 c++11 工具实现它,我会看到如下内容:

  • 保存数据的数组
  • 一个 std::stack 来保存已写入元素的索引
  • 一个 std::queue 来保存已读取元素的索引
  • 控制堆栈访问的互斥量。因此,读者和作者在弹出/压入堆栈之前获取互斥量。
  • 控制队列访问的互斥体。因此,读者和作者在入队/出队之前获取 Mutex。
  • 当堆栈为空时供读者阻塞的读者条件变量
  • 当队列为空时写入器阻塞的写入器条件变量。

  • 开始将所有索引(环形缓冲区的大小)排入队列。

  • 作者获取索引并写入数组,然后将索引压入堆栈并通知读者。
  • 读取器从堆栈中弹出索引,完成后将它们排入队列并通知写入器。
  • 空堆栈导致读取器等待写入器,空队列导致写入器等待读取器。

A nice article on the C++11 concurrency features.

编辑 进一步思考这个建议试图做的是将数组中的插槽视为资源池。读取器获取资源、槽的索引,并在完成处理后返回。

更新的解决方案: 由于插槽的数量限制为最多 8 个,我建议您使解决方案尽可能简单。让每个插槽负责自己的状态:

#include <atomic>
template <typename T>
class Slot {
   atomic_uint readers ;
   int iteration ;
   T data ;
}

然后创建一个 std:vector of Slots 并让读者和作者迭代这个向量。

编写器需要找到一个槽,其中 readers == 0(并且可能具有 iteration 的最小值)并且当它找到时它需要减少读者数然后检查阅读器计数是否为 -1 以确保阅读器没有在 if 和递减之间开始阅读。如果是,则重新增加 readers 并重新开始。

Readers 遍历数组以查找他们尚未读取的最大iteration 值以及readers >= 0 的位置。然后他们重新检查 readers 的值,如果 Writer 没有将它设置为 -1,他们会增加它并开始阅读。如果 Writer 已经开始写入那个槽,那么 Reader 会重新开始。

假设 Writer 已将一个插槽标识为空闲,而 Reader 已决定它需要读取该插槽,有 2 个可能的排序,并且在 Reader 和 Writer 中都退缩并重试。

Execution       Writer                  Reader
1               readers == 0 
2                                       readers >= 0
3               readers--
4                                       readers++
5               readers == -1 is false
6                                       readers > 0 is false!
7               readers++
8                                       readers--


Execution       Writer                  Reader
1                                       readers >= 0
2               readers == 0 
3                                       readers++
4               readers--
5                                       
6               readers == -1 is false
7                                       readers > 0 is false
8               readers++
9                                       readers--

如果 Reader 和 Writer 确实以这种方式发生冲突,则双方都无法访问并且都需要重试。

如果您对这种方法不满意,那么可以使用 Mutex 和锁/try_lock,在 Slot 类中类似这样的东西(注意:目前无法通过编译器运行它,因此可能有语法错误)

typedef enum LOCK_TYPE {READ, WRITE} ;
std::mutex mtx ;

bool lockSlot(LOCK_TYPE lockType)
{
    bool result = false ;
    if (mtx.try_lock()) {

        if ((lockType == READ) && (readers >= 0)) 
            readers++ ;
        if ((lockType == WRITE) && (readers == 0)) 
            readers-- ;
        result = true ;
    }
    return result ;
}

void unlockSlot (LOCK_TYPE lockType)
{
    if (mtx.lock()) {  // Wait for a lock this time

        if (lockType == READ) 
            readers-- ;
        if (lockType == WRITE)
            readers++ ;
    }
}

当/如果一个 Reader 用完了工作,它可以等待一个条件变量,Writer 可以使用这个条件变量在新数据可用时通知所有()读者。

关于multithreading - 环形缓冲区,1 个写入器和 N 个读取器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28212892/

相关文章:

c++ - remove_reference 如何禁用模板参数推导?

c++11 - C++中如何获取一组字符串中最长的字符串

c++ - 有没有办法并行化下三角矩阵求解器?

java - 具有 Java 多线程应用程序的 Singleton

algorithm - 汉密尔顿路径和欧拉路径的区别

c++ - 创建基于堆的2D数组而不使用双指针语法?

c++ - 如何使这段代码中的线程独立运行

c - g_cond_wait() 的这个示例代码会导致未定义的行为吗?

algorithm - 为什么 IDA* 比 A* 快,但为什么 IDA* 访问的节点比 A* 更多?

python - 如何将贝塞尔曲线拟合到一组数据?