下面是一个使用线程安全无界队列的非常简单的生产者/消费者问题示例。任何人都可以阐明为什么这段代码在使用 GNU C++ 编译时表现正确,而使用 LLVM C++ 编译时消费者线程却随机放弃?
#include <iostream>
#include <queue>
#include <math.h>
#include <time.h>
#include "boost/thread/condition_variable.hpp"
#include "boost/thread.hpp"
//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
lock.unlock();
printf("\n...just pushed, waking a thread...\n\n");
the_condition_variable.notify_one();
}
bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
printf("\n...buffer empty, waiting to pop...\n\n");
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop();
}
int len() {
boost::mutex::scoped_lock lock(the_mutex);
return (int)the_queue.size();
}
};
//
// PRODUCER
//
class Producer {
private:
Concurrent_Queue<int> *buff;
int next;
public:
Producer(Concurrent_Queue<int> *q): buff(q) { printf("Prod up!\n"); }
~Producer() {}
void run() {
int wait_time = 0;
while(1) {
wait_time = (rand()%5)+1;
sleep(wait_time);
printf("wait_time: %d\n", wait_time);
buff->push(wait_time);
printf("buffer_len: %d\n", buff->len());
}
}
};
//
// CONSUMER
//
class Consumer {
private:
Concurrent_Queue<int> * buff;
public:
Consumer(Concurrent_Queue<int> *q): buff(q) { printf("Con up!\n"); }
~Consumer() {}
void run() {
unsigned wait_time = 0;
int latest = 0;
while(1) {
wait_time = (rand()%7)+1;
sleep(wait_time);
buff->wait_and_pop(latest);
printf("latest consumed int: %d\n", latest);
printf("cons buff_len: %d\n", buff->len());
}
}
};
//
// MAIN
//
int main(int argc, const char * argv[])
{
srand((unsigned)time(NULL));
Concurrent_Queue<int> Con_Q;
Consumer taker(&Con_Q);
// sleep(3);
Producer giver(&Con_Q);
boost::thread* prod_thread = new boost::thread(boost::bind(&Producer::run, &giver));
boost::thread* cons_thread = new boost::thread(boost::bind(&Consumer::run, &taker));
prod_thread->join();
cons_thread->join();
}
最佳答案
您应该将通知调用移至互斥量下。
这在 pthreads(7) 联机帮助页的某处有记录。我会设法找到它。
更新我目前能找到的最相关的报价是:
The
pthread_cond_broadcast()
orpthread_cond_signal()
functions may be called by a thread whether or not it currently owns the mutex that threads callingpthread_cond_wait()
orpthread_cond_timedwait()
have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread callingpthread_cond_broadcast()
orpthread_cond_signal()
.The
pthread_cond_broadcast()
andpthread_cond_signal()
functions shall have no effect if there are no threads currently blocked on cond.
我知道如果在锁外发出条件信号,像 Helgrind 这样的线程检查工具会报错。
旁注:
正好前几天写了一个带任务队列的thread_pool,也支持shutdown。您可以尝试在您的 Mac 上是否出现此症状:
bool empty() const
并不是很有用,因为它是一个 racey 调用。只有将锁转移给调用者,它才是线程安全的-
int len() const
有同样的问题 您可以使用谓词版本
cv::wait()
获得更清晰的代码:void wait_and_pop(Data& popped_value) { namespace phx = boost::phoenix; boost::unique_lock<boost::mutex> lock(the_mutex); //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n"); the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue))); popped_value = the_queue.front(); the_queue.pop(); }
我更喜欢使用与 c++11 类似的接口(interface)(
unique_lock<>
而不是mutex::scoped_lock
),这样切换起来更容易。- 生产者有一个未使用的字段
next
- 我删除了它
这是我稍作修改的版本,因此您可以复制/粘贴以检查 MacOS(我没有 Mac):
#include <iostream>
#include <queue>
#include "boost/thread.hpp"
#include "boost/phoenix.hpp"
//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
typedef std::queue<Data> queue_t;
queue_t the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::lock_guard<boost::mutex> lock(the_mutex);
the_queue.push(data);
printf("\n...just pushed, waking a thread...\n\n");
the_condition_variable.notify_one();
}
#ifdef UNUSED_CODE
bool empty() const
{
boost::lock_guard<boost::mutex> lock(the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::lock_guard<boost::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
#endif
void wait_and_pop(Data& popped_value)
{
namespace phx = boost::phoenix;
boost::unique_lock<boost::mutex> lock(the_mutex);
//if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");
the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));
popped_value = the_queue.front();
the_queue.pop();
}
std::size_t len() {
boost::lock_guard<boost::mutex> lock(the_mutex);
return the_queue.size();
}
};
//
// PRODUCER
//
class Producer {
private:
Concurrent_Queue<int> &buff;
public:
Producer(Concurrent_Queue<int> &q): buff(q) { printf("Prod up!\n"); }
~Producer() {}
void run() {
int wait_time = 0;
while(1) {
wait_time = (rand()%5)+1;
boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
printf("wait_time: %d\n", wait_time);
buff.push(wait_time);
printf("buffer_len: %lu\n", buff.len());
}
}
};
//
// CONSUMER
//
class Consumer {
private:
Concurrent_Queue<int> & buff;
public:
Consumer(Concurrent_Queue<int> &q): buff(q) { printf("Con up!\n"); }
~Consumer() {}
void run() {
unsigned wait_time = 0;
int latest = 0;
while(1) {
wait_time = (rand()%7)+1;
boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
buff.wait_and_pop(latest);
printf("latest consumed int: %d\n", latest);
printf("cons buff_len: %lu\n", buff.len());
}
}
};
//
// MAIN
//
int main()
{
srand((unsigned)time(NULL));
Concurrent_Queue<int> Con_Q;
Consumer taker(Con_Q);
//boost::this_thread::sleep_for(boost::chrono::seconds(3));
Producer giver(Con_Q);
boost::thread_group group;
group.create_thread(boost::bind(&Producer::run, &giver));
group.create_thread(boost::bind(&Consumer::run, &taker));
group.join_all();
}
关于c++ - Boost 并发库在使用 GNU C++/LLVM C++ 编译时表现不同,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18797501/