我在 C++ 中使用此类进行生产者-消费者设置:
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
template <typename T> class SafeQueue
{
public:
SafeQueue() :
_shutdown(false)
{
}
void Enqueue(T item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
bool was_empty = _queue.empty();
_queue.push(std::move(item));
lock.unlock();
if (was_empty)
_condition_variable.notify_one();
}
bool Dequeue(T& item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
if(!_shutdown)
{
item = std::move(_queue.front());
_queue.pop();
return true;
}
return false;
}
bool IsEmpty()
{
std::lock_guard<std::mutex> lock(_queue_mutex);
return _queue.empty();
}
void Shutdown()
{
_shutdown = true;
_condition_variable.notify_all();
}
private:
std::mutex _queue_mutex;
std::condition_variable _condition_variable;
std::queue<T> _queue;
std::atomic<bool> _shutdown;
};
我是这样使用它的:
class Producer
{
public:
Producer() :
_running(true),
_t(std::bind(&Producer::ProduceThread, this))
{ }
~Producer()
{
_running = false;
_incoming_packets.Shutdown();
_t.join();
}
SafeQueue<Packet> _incoming_packets;
private:
void ProduceThread()
{
while(_running)
{
Packet p = GetNewPacket();
_incoming_packets.Enqueue(p);
}
}
std::atomic<bool> _running;
std::thread _t;
}
class Consumer
{
Consumer(Producer* producer) :
_producer(producer),
_t(std::bind(&Consumer::WorkerThread, this))
{ }
~Consumer()
{
_t.join();
}
private:
void WorkerThread()
{
Packet p;
while(producer->_incoming_packets.Dequeue(p))
ProcessPacket(p);
}
std::thread _t;
Producer* _producer;
}
这在大多数 时间都有效。但是偶尔当我删除生产者时(并导致它的解构函数调用 SafeQueue::Shutdown
,_t.join()
会永远阻塞。
我的猜测是问题出在这里(在 SafeQueue::Dequeue
中):
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
SafeQueue::Shutdown
从线程 #1 被调用,同时线程 #2 完成检查 _shutdown 但在执行 _condition_variable.wait(lock)
,因此它“错过”了 notify_all()
。这会发生吗?
如果这是问题所在,解决它的最佳方法是什么?
最佳答案
由于 SafeQueue 对象归生产者所有,删除生产者会导致通知消费者和在〜生产者完成时从其下删除 SafeQueue 之间的竞争条件。
我建议让共享资源既不属于生产者也不属于消费者,而是作为对每个构造函数的引用传递。
改变生产者和消费者的构造函数;
Producer( SafeQueue<Packet> & queue ) :
_running(false), _incoming_packets(queue) {}
Consumer( SafeQueue<Packet> & queue ) :
_running(false), _incoming_packets(queue) {}
以这种方式使用您的实例;
SafeQueue<Packet> queue;
Producer producer(queue);
Consumer consumer(queue);
...do stuff...
queue.shutdown();
这也解决了您在 Consumer 类中与 Producer 类紧密耦合的不良设计问题。
此外,在析构函数中终止和加入线程可能不是一个好主意,就像您对 ~Producer 所做的那样。最好为每个线程类添加一个 Shutdown() 方法,并显式调用它们;
producer.shutdown();
consumer.shutdown();
queue.shutdown();
关闭顺序并不重要,除非您担心在停止消费者时丢失仍在队列中的未处理数据包。
关于C++ 线程安全队列关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39512514/