c++ - 动态生成和安全使用spsc_queues

标签 c++ boost vector producer-consumer lock-free

我制作的唯一boost::lockfreespsc_queue,这太神奇了。

但是,我想在一个线程与cores - 1线程来回传递信息的地方实现它。

我当时在想,每个工作线程都有自己的spsc_queues集合,该集合将存储在vector s中,在那里主线程将信息传递到一个传出队列,然后移至vector中的下一个队列,并等等,以及循环进入的队列。

可以安全地推送和弹出两个spsc_queue中的这些vector吗?

如果没有,是否有根据我的意图使用spsc_queues的替代方法?

最佳答案

基本上,您建议以预期的方式使用2x(cores-1)spsc_queues。是的,这可以工作。

我看不出您将如何处理主线程上的响应(“传入队列”)。实话实说,传入队列上没有“等待”操作,您也不想要一个(不再是非常无锁的,并且在等待传入消息时,所有其他工作人员都不会得到服务)。

Aside: If you dimension your response queues such that they will never overflow, then you could get a long way with naive-roundrobin reading from it (just don't attempt to read all messages from a single response queue, because this is a sure-fire way to get scheduling starvation for the other response queues).

Code sample at the bottom (CODE SAMPLE)



所有这些使我强烈怀疑您实际上是异​​步,而不是并发。我有一种让您的应用程序在1个线程上运行的感觉,这是非常高兴的,只是尽快“尽可能地”服务每条可用消息(无论消息的来源或内容如何)。
  • 对于可以在非常短的时间内处理的大量小消息** [¹] **,此模型将很好地扩展。
  • 当1个线程饱和时,可以通过添加worker进行扩展。
  • 在具有需要大量处理的消息的服务中,您可以以异步的方式在仅处理低频请求的专用线程上分担这些任务:它们只需将小的“完成”消息推回主工作队列即可他们完成了。

  • 所有这些使我想到了libuv或Boost Asio之类的库。如果您已经一手掌握了您将需要无锁运行来获得所需吞吐量的信息(这在工业强度服务器解决方案中很少见),则可以使用无锁队列进行模拟。这需要做更多的工作,因为您必须将epoll / select / poll循环集成到生产者中。我建议您保持简单,简单,并仅在实际需要时才采用附加的复杂性。

    Mantra: correct, well-factored first; optimize later



    (请注意此处的“构造良好”。在这种情况下,这意味着您将/不/允许在高吞吐量队列上执行缓慢的处理任务。)

    代码样本

    如所 promise 的,一个简单的概念证明显示了使用带有多个工作线程的多个双向SPSC队列消息传递。

    完全无锁定的版本: Live On Coliru

    这里有很多微妙之处。特别要注意的是,队列的维数不足会如何导致导致静默丢弃的消息。如果消费者能够跟上生产者的步伐,这将不会发生,但是只要您不知道操作系统的 Activity ,您就应该对此进行检查。

    更新根据注释中的请求,这是一个检查队列饱和度的版本-不丢弃消息。 也是Live On Coliru还是
  • 不能删除任何消息
  • 没有更多的延迟到达(因为直到收到所有响应才退出主循环)
  • 循环不再绑定(bind)到循环变量,因为发送可能会停顿,这将导致始终读取同一响应队列。这是死锁或其他最坏情况下性能的秘诀。
  • 在队列饱和的情况下,我们必须考虑一种平衡负载的适当方法。我选择了小睡。从技术上讲,这意味着当队列饱和时,我们的无锁免等待解决方案将降级为常规协作多线程。如果检测到这种情况,也许您希望增加队列。这一切都取决于您的系统。
  • 您将想知道什么时候发生。我包括了所有线程的简单拥塞统计信息。在我的系统上,通过microsleep调用sleep_for(nanoseconds(1)),输出为:
    Received 1048576 responses (97727 100529 103697 116523 110995 115291 103048 102611 102583 95572 )
    
    Total: 1048576 responses/1048576 requests
    Main thread congestion: 21.2%
    Worker #0 congestion: 1.7%
    Worker #1 congestion: 3.1%
    Worker #2 congestion: 2.0%
    Worker #3 congestion: 2.5%
    Worker #4 congestion: 4.5%
    Worker #5 congestion: 2.5%
    Worker #6 congestion: 3.0%
    Worker #7 congestion: 3.2%
    Worker #8 congestion: 3.1%
    Worker #9 congestion: 3.6%
    
    real    0m0.616s
    user    0m3.858s
    sys 0m0.025s
    

    如您所见,Coliru的调音必须大不相同。只要您的系统存在以最大负载运行的风险,就需要进行此调整。
  • 相反,您必须考虑在队列为空时如何限制负载:此刻,工作人员将仅在队列上忙循环,等待消息出现。在真实的服务器环境中,当负载突然爆发时,您将需要检测“空闲”时间段并降低轮询频率,以节省CPU功耗(同时允许CPU最大化其他线程的吞吐量)。
    此答案中包括第二个“混合”版本(在队列饱和之前是无锁的):
    #include <boost/lockfree/spsc_queue.hpp>
    #include <boost/scoped_ptr.hpp>
    #include <boost/thread.hpp>
    #include <memory>
    #include <iostream>
    #include <iterator>
    
    namespace blf = boost::lockfree;
    
    static boost::atomic_bool shutdown(false);
    
    static void nanosleep()
    {
        //boost::this_thread::yield();
        boost::this_thread::sleep_for(boost::chrono::nanoseconds(1));
    }
    
    struct Worker
    {
        typedef blf::spsc_queue<std::string > queue;
        typedef std::unique_ptr<queue> qptr;
        qptr incoming, outgoing;
        size_t congestion = 0;
    
        Worker() : incoming(new queue(64)), outgoing(new queue(64)) 
        {
        }
    
        void operator()()
        {
            std::string request;
            while (!shutdown)
            {
                while (incoming->pop(request)) 
                    while (!outgoing->push("Ack: " + request))
                        ++congestion, nanosleep();
            }
        }
    };
    
    int main()
    {
        boost::thread_group g;
    
        std::vector<Worker> workers(10);
        std::vector<size_t> responses_received(workers.size());
    
        for (auto& w : workers)
            g.create_thread(boost::ref(w));
    
        // let's give them something to do
        const auto num_requests = (1ul<<20);
        std::string response;
        size_t congestion = 0;
    
        for (size_t total_sent = 0, total_received = 0; total_sent < num_requests || total_received < num_requests;)
        {
            if (total_sent < num_requests)
            {
                // send to a random worker
                auto& to = workers[rand() % workers.size()];
                if (to.incoming->push("request " + std::to_string(total_sent)))
                    ++total_sent;
                else
                    congestion++;
            }
    
            if (total_received < num_requests)
            {
                static size_t round_robin = 0;
                auto from = (++round_robin) % workers.size();
                if (workers[from].outgoing->pop(response))
                {
                    ++responses_received[from];
                    ++total_received;
                }
            }
        }
    
        auto const sum = std::accumulate(begin(responses_received), end(responses_received), size_t());
        std::cout << "\nReceived " << sum << " responses (";
        std::copy(begin(responses_received), end(responses_received), std::ostream_iterator<size_t>(std::cout, " "));
        std::cout << ")\n";
    
        shutdown = true;
        g.join_all();
    
        std::cout << "\nTotal: " << sum << " responses/" << num_requests << " requests\n";
    
        std::cout << "Main thread congestion: " << std::fixed << std::setprecision(1) << (100.0*congestion/num_requests) << "%\n";
    
        for (size_t idx = 0; idx < workers.size(); ++idx)
            std::cout << "Worker #" << idx << " congestion: " << std::fixed << std::setprecision(1) << (100.0*workers[idx].congestion/responses_received[idx]) << "%\n";
    }
    

    [¹] 与以往一样,“非常少的时间”是一个相对的概念,大致意味着“比新消息之间的平均时间短的时间”。例如。如果您每秒有100个请求,那么对于单线程系统,5ms的处理时间将“很少”。但是,如果每秒有1万个请求,则1毫秒的处理时间大约是16核服务器上的限制。

    关于c++ - 动态生成和安全使用spsc_queues,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23183629/

    相关文章:

    c++ - 用于类初始化的 SFML vector

    允许任意类输入的C++函数

    c++ - BOOST_BINARY 宏如何解析空格?

    c++ - 'operator='不匹配(操作数类型为'__gnu_cxx::__ alloc_traits <std::allocator <std::vector <int>>>

    c++ - 这个常量不正确吗?

    c++ - 如何在不使用任何图形库的情况下在 C++ 中创建文本编辑器?

    c++ - 在 ‘)’ token C++ 之前预期为 ‘&’

    c++ - 使用代码块在 Windows 上安装 boost

    java - 两个 3D vector 之间的角度

    MATLAB:一个高效的函数,可以对 Vector 中的唯一数字进行分组