我有相当典型的生产者/消费者问题,我已经用 bounded buffer 解决了.单个进程生成项目并将它们交给 N 个工作线程。工作线程处理这些项目并将结果放在有界缓冲区中。最终消费者进程从缓冲区中检索完成的项目。以下数据流图说明:
每个工作人员处理其项目的时间长短不定,因此工作人员以基本上随机的顺序将完成的项目插入到有界缓冲区中。这工作得很好,但有时需要按照它们最初生成的相同顺序检索完成的项目。所以问题是:
如何修改我现有的实现以按顺序检索成品?
一个重要的附加约束是我们必须遵守有界缓冲区的大小。如果缓冲区的大小为 M,那么在任何给定时间我们都不能有超过 M 个成品在等待消费者。
有界缓冲区
有界缓冲区有一个简单的接口(interface):
template <class T> class bounded_buffer
{
public:
// initializes a new buffer
explicit bounded_buffer(size_t capacity);
// pushes an item into the buffer, blocks if full
void push(T item);
// pops an item from the buffer, blocks if empty
T pop();
};
处理项目
工作线程使用以下代码来处理一个项目:
std::unique_lock guard{ source_lock };
auto item = GetNextItem();
guard.unlock();
buffer.push(ProcessItem(std::move(item)));
(实际代码要复杂一些,因为它必须处理输入数据的结尾、取消和处理错误。但这些细节与问题无关。)
检索已完成项目的代码只是弹出有界缓冲区:
auto processed_item = buffer.pop();
最佳答案
我将提出两种解决方案。第一个是快速和简单的。第二个建立在第一个背后的想法之上,以产生更高效的东西。
第一种方法:std::future
基本思想是,当我们第一次检索一个值时,我们将在有界缓冲区中“预留”一个空间,并在我们完成对项目的处理后填充它。 std::future
提供了一个现成的机制来实现这一点。而不是使用 bounded_buffer<T>
,我们将使用 bounded_buffer<std::future<T>>
.我们调整 worker 代码如下:
std::unique_lock guard{ source_lock };
auto item = GetNextItem();
std::promise<T> processed_item;
buffer.push(processed_item.get_future());
guard.unlock();
processed_item.set_value(ProcessItem(std::move(item)));
然后我们调整消费者代码,只需轻轻一点即可从 future 检索值:
auto processed_item = buffer.pop().get();
如果消费者进程在工作进程完成之前检索了一个项目,那么 std::future<T>::get
将确保消费者阻塞,直到元素准备就绪。
优点:
- 相对简单,它解决了问题。我们在持有源锁的同时将 future 放入有界缓冲区,因此这保证了最终结果进入缓冲区的顺序与我们从源中检索它们的顺序相同。
- 不需要对有界缓冲区本身进行任何更改,从而保留了该抽象的纯度。
缺点:
-
std::future
相对重量级,需要额外的内存分配和内部同步。 - 我们现在在插入缓冲区时持有源代码锁(并且插入可能会阻塞);这可能很好,但如果
GetNextItem()
可能会有问题很贵。
第二种方法:构建更好的缓冲区
为了解决第一种方法中的性能问题,我们可以调整有界缓冲区的实现,以构建在其中保留空间的想法。我们将对其界面进行三处更改:
- 更改构造函数以接受谓词。
- 更改推送方法以返回定位器。
- 添加一个新的
replace
接受定位器和值的方法。
修改后的界面如下:
template <class T, class P> class bounded_buffer
{
public:
using locator_type = /* unspecified */;
// initializes a new buffer; an item is "available" if and only if it
// satisfies this predicate
explicit bounded_buffer(size_t capacity, P predicate);
// pushes an item into the buffer, blocks if full; the buffer's count of
// available items will increase by one if and only if all items in the
// buffer (including the new one) are available
locator_type push(T item);
// pops an item from the buffer, blocks if empty
T pop();
// replaces an existing item in the buffer; if the item is the first in the
// buffer, then we set the count of available items as follows: 0 if the
// item is unavailable, or X if it is available where X is the number of
// available items at the front of the buffer
void replace(locator_type location, T item);
};
然后我们将存储在有界缓冲区中的类型从 T
更改为至 std::variant<std::monostate, T>
.如果包含 T,则谓词将认为该项目“可用”。我们更改工作人员代码如下:
std::unique_lock guard{ source_lock };
auto item = GetNextItem();
auto location = buffer.push(std::monostate{});
guard.unlock();
buffer.replace(location, ProcessItem(std::move(item));
消费者中的检索代码也必须更改才能从变体中检索值:
auto processed_item = std::get<1>(buffer.pop());
优点:
- 比
std::future
更轻,因此性能更高方法。 (它只需要比原始版本稍微多一点内存来存储std::variant
索引。) - 解决问题的方式与
future
基本相同版本。
缺点:
- 需要对有界缓冲区实现进行更改,并且其基础操作不再完全符合您对该抽象的期望。
- 未解决上述确定的源代码锁定问题。
错误处理
为简单起见,我省略了错误处理。然而,这两种方法都需要适当的异常处理。如果在使用编写的代码处理项目时发生异常,消费者将挂起,因为它将等待永远不会到达的保留项目。
关于c++ - 如何按照它们最初在 C++ 中生成的顺序从有界缓冲区中检索项目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53481543/