c++ - Boost 并发库在使用 GNU C++/LLVM C++ 编译时表现不同

标签 c++ xcode boost g++ llvm-c++-api

下面是一个使用线程安全无界队列的非常简单的生产者/消费者问题示例。任何人都可以阐明为什么这段代码在使用 GNU C++ 编译时表现正确,而使用 LLVM C++ 编译时消费者线程却随机放弃?

#include <iostream>
#include <queue>
#include <math.h>
#include <time.h>
#include "boost/thread/condition_variable.hpp"
#include "boost/thread.hpp"

//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
    std::queue<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
        lock.unlock();
        printf("\n...just pushed, waking a thread...\n\n");
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            printf("\n...buffer empty, waiting to pop...\n\n");
            the_condition_variable.wait(lock);
        }

        popped_value=the_queue.front();
        the_queue.pop();
    }

    int len() {
        boost::mutex::scoped_lock lock(the_mutex);
        return (int)the_queue.size();
    }

};

//
// PRODUCER
//    
class Producer {
private:
    Concurrent_Queue<int> *buff;
    int next;
public:
    Producer(Concurrent_Queue<int> *q): buff(q) { printf("Prod up!\n"); }
    ~Producer() {}
    void run() {
        int wait_time = 0;
        while(1) {
            wait_time = (rand()%5)+1;
            sleep(wait_time);
            printf("wait_time: %d\n", wait_time);
            buff->push(wait_time);
            printf("buffer_len: %d\n", buff->len());
        }
    }
};

//
// CONSUMER 
//    
class Consumer {
private:
    Concurrent_Queue<int> * buff;
public:
    Consumer(Concurrent_Queue<int> *q): buff(q) { printf("Con up!\n"); }
    ~Consumer() {}
        void run() {
        unsigned wait_time = 0;
        int latest = 0;
        while(1) {
            wait_time = (rand()%7)+1;
            sleep(wait_time);
            buff->wait_and_pop(latest);
            printf("latest consumed int: %d\n", latest);
            printf("cons buff_len: %d\n", buff->len());
        }
    }
};

//
//  MAIN
//
int main(int argc, const char * argv[])
{
    srand((unsigned)time(NULL));
    Concurrent_Queue<int> Con_Q;
    Consumer taker(&Con_Q);
//  sleep(3);
    Producer giver(&Con_Q);
    boost::thread* prod_thread = new boost::thread(boost::bind(&Producer::run, &giver));
    boost::thread* cons_thread = new boost::thread(boost::bind(&Consumer::run, &taker));

    prod_thread->join();
    cons_thread->join();
}

最佳答案

您应该将通知调用移至互斥量下。

这在 pthreads(7) 联机帮助页的某处有记录。我会设法找到它。

更新我目前能找到的最相关的报价是:

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

The pthread_cond_broadcast() and pthread_cond_signal() functions shall have no effect if there are no threads currently blocked on cond.

我知道如果在锁外发出条件信号,像 Helgrind 这样的线程检查工具会报错。

旁注:

  • 正好前几天写了一个带任务队列的thread_pool,也支持shutdown。您可以尝试在您的 Mac 上是否出现此症状:

  • bool empty() const并不是很有用,因为它是一个 racey 调用。只有将锁转移给调用者,它才是线程安全的

  • int len() const有同样的问题
  • 您可以使用谓词版本cv::wait()获得更清晰的代码:

    void wait_and_pop(Data& popped_value)
    {
        namespace phx = boost::phoenix;
    
        boost::unique_lock<boost::mutex> lock(the_mutex);
    
        //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");
    
        the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));
    
        popped_value = the_queue.front();
        the_queue.pop();
    }
    
  • 我更喜欢使用与 c++11 类似的接口(interface)(unique_lock<> 而不是 mutex::scoped_lock),这样切换起来更容易。

  • 生产者有一个未使用的字段 next - 我删除了它

这是我稍作修改的版本,因此您可以复制/粘贴以检查 MacOS(我没有 Mac):

#include <iostream>
#include <queue>
#include "boost/thread.hpp"
#include "boost/phoenix.hpp"

//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
    typedef std::queue<Data> queue_t;
    queue_t the_queue;

    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        the_queue.push(data);

        printf("\n...just pushed, waking a thread...\n\n");
        the_condition_variable.notify_one();
    }

#ifdef UNUSED_CODE
    bool empty() const
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }
#endif

    void wait_and_pop(Data& popped_value)
    {
        namespace phx = boost::phoenix;

        boost::unique_lock<boost::mutex> lock(the_mutex);

        //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");

        the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));

        popped_value = the_queue.front();
        the_queue.pop();
    }

    std::size_t len() {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        return the_queue.size();
    }

};

//
// PRODUCER
//    
class Producer {
private:
    Concurrent_Queue<int> &buff;
public:
    Producer(Concurrent_Queue<int> &q): buff(q) { printf("Prod up!\n"); }
    ~Producer() {}
    void run() {
        int wait_time = 0;
        while(1) {
            wait_time = (rand()%5)+1;
            boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
            printf("wait_time: %d\n", wait_time);
            buff.push(wait_time);
            printf("buffer_len: %lu\n", buff.len());
        }
    }
};

//
// CONSUMER 
//    
class Consumer {
private:
    Concurrent_Queue<int> & buff;
public:
    Consumer(Concurrent_Queue<int> &q): buff(q) { printf("Con up!\n"); }
    ~Consumer() {}
        void run() {
        unsigned wait_time = 0;
        int latest = 0;
        while(1) {
            wait_time = (rand()%7)+1;
            boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
            buff.wait_and_pop(latest);
            printf("latest consumed int: %d\n", latest);
            printf("cons buff_len: %lu\n", buff.len());
        }
    }
};

//
//  MAIN
//
int main()
{
    srand((unsigned)time(NULL));
    Concurrent_Queue<int> Con_Q;

    Consumer taker(Con_Q);
    //boost::this_thread::sleep_for(boost::chrono::seconds(3));
    Producer giver(Con_Q);

    boost::thread_group group;
    group.create_thread(boost::bind(&Producer::run, &giver));
    group.create_thread(boost::bind(&Consumer::run, &taker));

    group.join_all();
}

关于c++ - Boost 并发库在使用 GNU C++/LLVM C++ 编译时表现不同,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18797501/

相关文章:

c++ - 如何在 C++ 中使用类模板启动类构造函数?

ios - 找不到 PDF 标题 : `%PDF' not found

c++ - 如何手动刷新 boost 日志?

ios - 如何使用 Xcode 11 从命令行上传到 App Store?

ios - 将 GameKit key 添加到您的信息 plist 文件(错误)

c++ - 如何将 boost::serialize 成 sqlite::blob?

c++ - 不同容器的包装器 contains() 函数

c++ - 在 C++ 中重载二元关系运算符的正确方法

c++ - 策略数组类设计包装器

c++ - 显示 std::thread 函数传递的错误数据。