我制作的唯一boost::lockfree
是spsc_queue
,这太神奇了。
但是,我想在一个线程与cores - 1
线程来回传递信息的地方实现它。
我当时在想,每个工作线程都有自己的spsc_queues
集合,该集合将存储在vector
s中,在那里主线程将信息传递到一个传出队列,然后移至vector
中的下一个队列,并等等,以及循环进入的队列。
可以安全地推送和弹出两个spsc_queue
中的这些vector
吗?
如果没有,是否有根据我的意图使用spsc_queues的替代方法?
最佳答案
基本上,您建议以预期的方式使用2x(cores-1)spsc_queues。是的,这可以工作。
我看不出您将如何处理主线程上的响应(“传入队列”)。实话实说,传入队列上没有“等待”操作,您也不想要一个(不再是非常无锁的,并且在等待传入消息时,所有其他工作人员都不会得到服务)。
Aside: If you dimension your response queues such that they will never overflow, then you could get a long way with naive-roundrobin reading from it (just don't attempt to read all messages from a single response queue, because this is a sure-fire way to get scheduling starvation for the other response queues).
Code sample at the bottom (CODE SAMPLE)
所有这些使我强烈怀疑您实际上是异步,而不是并发。我有一种让您的应用程序在1个线程上运行的感觉,这是非常高兴的,只是尽快“尽可能地”服务每条可用消息(无论消息的来源或内容如何)。
所有这些使我想到了libuv或Boost Asio之类的库。如果您已经一手掌握了您将需要无锁运行来获得所需吞吐量的信息(这在工业强度服务器解决方案中很少见),则可以使用无锁队列进行模拟。这需要做更多的工作,因为您必须将epoll / select / poll循环集成到生产者中。我建议您保持简单,简单,并仅在实际需要时才采用附加的复杂性。
Mantra: correct, well-factored first; optimize later
(请注意此处的“构造良好”。在这种情况下,这意味着您将/不/允许在高吞吐量队列上执行缓慢的处理任务。)
代码样本
如所 promise 的,一个简单的概念证明显示了使用带有多个工作线程的多个双向SPSC队列消息传递。
完全无锁定的版本: Live On Coliru
这里有很多微妙之处。特别要注意的是,队列的维数不足会如何导致导致静默丢弃的消息。如果消费者能够跟上生产者的步伐,这将不会发生,但是只要您不知道操作系统的 Activity ,您就应该对此进行检查。
更新根据注释中的请求,这是一个检查队列饱和度的版本-不丢弃消息。 也是Live On Coliru还是。
microsleep
调用sleep_for(nanoseconds(1))
,输出为:Received 1048576 responses (97727 100529 103697 116523 110995 115291 103048 102611 102583 95572 )
Total: 1048576 responses/1048576 requests
Main thread congestion: 21.2%
Worker #0 congestion: 1.7%
Worker #1 congestion: 3.1%
Worker #2 congestion: 2.0%
Worker #3 congestion: 2.5%
Worker #4 congestion: 4.5%
Worker #5 congestion: 2.5%
Worker #6 congestion: 3.0%
Worker #7 congestion: 3.2%
Worker #8 congestion: 3.1%
Worker #9 congestion: 3.6%
real 0m0.616s
user 0m3.858s
sys 0m0.025s
如您所见,Coliru的调音必须大不相同。只要您的系统存在以最大负载运行的风险,就需要进行此调整。
此答案中包括第二个“混合”版本(在队列饱和之前是无锁的):
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <iostream>
#include <iterator>
namespace blf = boost::lockfree;
static boost::atomic_bool shutdown(false);
static void nanosleep()
{
//boost::this_thread::yield();
boost::this_thread::sleep_for(boost::chrono::nanoseconds(1));
}
struct Worker
{
typedef blf::spsc_queue<std::string > queue;
typedef std::unique_ptr<queue> qptr;
qptr incoming, outgoing;
size_t congestion = 0;
Worker() : incoming(new queue(64)), outgoing(new queue(64))
{
}
void operator()()
{
std::string request;
while (!shutdown)
{
while (incoming->pop(request))
while (!outgoing->push("Ack: " + request))
++congestion, nanosleep();
}
}
};
int main()
{
boost::thread_group g;
std::vector<Worker> workers(10);
std::vector<size_t> responses_received(workers.size());
for (auto& w : workers)
g.create_thread(boost::ref(w));
// let's give them something to do
const auto num_requests = (1ul<<20);
std::string response;
size_t congestion = 0;
for (size_t total_sent = 0, total_received = 0; total_sent < num_requests || total_received < num_requests;)
{
if (total_sent < num_requests)
{
// send to a random worker
auto& to = workers[rand() % workers.size()];
if (to.incoming->push("request " + std::to_string(total_sent)))
++total_sent;
else
congestion++;
}
if (total_received < num_requests)
{
static size_t round_robin = 0;
auto from = (++round_robin) % workers.size();
if (workers[from].outgoing->pop(response))
{
++responses_received[from];
++total_received;
}
}
}
auto const sum = std::accumulate(begin(responses_received), end(responses_received), size_t());
std::cout << "\nReceived " << sum << " responses (";
std::copy(begin(responses_received), end(responses_received), std::ostream_iterator<size_t>(std::cout, " "));
std::cout << ")\n";
shutdown = true;
g.join_all();
std::cout << "\nTotal: " << sum << " responses/" << num_requests << " requests\n";
std::cout << "Main thread congestion: " << std::fixed << std::setprecision(1) << (100.0*congestion/num_requests) << "%\n";
for (size_t idx = 0; idx < workers.size(); ++idx)
std::cout << "Worker #" << idx << " congestion: " << std::fixed << std::setprecision(1) << (100.0*workers[idx].congestion/responses_received[idx]) << "%\n";
}
[¹] 与以往一样,“非常少的时间”是一个相对的概念,大致意味着“比新消息之间的平均时间短的时间”。例如。如果您每秒有100个请求,那么对于单线程系统,5ms的处理时间将“很少”。但是,如果每秒有1万个请求,则1毫秒的处理时间大约是16核服务器上的限制。
关于c++ - 动态生成和安全使用spsc_queues,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23183629/