我正在尝试使用C ++ 11并发技术创建基于线程间消息的通信。 Anthony William的书“ Concurrency in Action”描述了此实现所基于的线程安全锁定队列。本书中描述的线程安全锁定队列与我要实现的线程安全锁定队列之间的区别是,首先,我使用通用引用将队列元素转发到阻塞队列,其次(这就是问题所在)我需要能够存储std :: shared_ptr指针队列,因为模板类型包括具有抽象基类的简单消息类层次结构和具有实际专用消息的子类。我需要使用共享指针以避免数据切片。
编辑:我添加了coliru演示以更清楚地显示我的问题。
Live Demo
编辑1:对coliru live demo的更多更新以及其他编译器错误:
Coliru Demo With Compiler Errors
编辑2:感谢亚历杭德罗,我有一个可行的解决方案Working Coliru Example
为此,我将Anthony William对底层消息队列的实现从以下更改:
std::queue<T> data_queue
到一个
std::queue<std::shared_ptr<T>> data_queue
但是,当我尝试通过通用参考完美转发签名将消息指针推入队列时,我遇到了各种各样的错误。
我希望能够在此队列上添加消息的方式如下:
UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);
使用上面的代码,编译器抱怨指出了某些东西
以下几行错误C2664:'无效
UtlThreadSafeQueue :: push(T &&)':无法转换
参数1从'std :: shared_ptr'到
T = BaseMessageType的“ BaseMessageType &&”
我想我需要某种方式来专门化指针类型,但是我不确定。
我的实现如下:
/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/
#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_
// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>
// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
// Windows uses min/max macros
#undef min
#undef max
#endif
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
template<typename T>
class UtlThreadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
std::size_t capacity;
std::atomic<bool> shutdownFlag;
public:
explicit UtlThreadSafeQueue(const size_t& rCapacity =
std::numeric_limits<std::size_t>::max())
: mut()
, data_queue()
, data_cond()
, capacity(rCapacity)
, shutdownFlag(false)
{}
UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
std::lock_guard<std::mutex> lock(rhs.mut);
data_queue = rhs.data_queue;
}
virtual ~UtlThreadSafeQueue() = default;
// move aware push
inline void push(T&& value) {
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::forward<T>(value));
data_cond.notify_one();
}
// wait for non empty lambda condition before returning value
inline void wait_and_pop(T& rValue) {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty();});
// ideally should return an invalid value
if (!shutdownFlag) {
rValue = data_queue.front();
data_queue.pop();
}
}
// wait for non empty lambda condition before returning shared pointer to value
inline std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
if (shutdownFlag) {
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
return nullptr;
}
// return value in specified reference and flag indicating whether value
// successfully returned or not
inline bool try_pop(T& rValue) {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return false;
}
rValue = data_queue.front();
data_queue.pop();
return true;
}
// return shared pointer to value - which if set to nullptr,
// indicates container was empty at the time of the call.
inline std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
// thread safe method to check if the queue is empty
// note that if it is empty
inline bool empty() const {
std::lock_guard<std::mutex> lock(mut);
return data_queue.empty();
}
// shutdown support - wake up potentially sleeping queues
inline void shutdown() {
shutdownFlag = true;
data_cond.notify_all();
}
};
#endif // _utlThreadSafeQueue_h_
最佳答案
在评论中进行了广泛的讨论之后,按照Coliru的链接,我认为我了解您最初尝试做的事情,并且我也想为您的数据结构提供一些建议。
您提到push()
函数可以移动。优秀!不过要小心。
如果您查看如何定义push
函数,
inline void push(T&& value)
我想在这里指出几点。首先是,这将仅绑定到r值引用,而不绑定到通用引用(或forwarding references,因为它们将很快被称为)。在
std::forward
中使用push
是不合适的操作(尽管从技术上讲是正确的)。原因是在类级别(实例化T
时)已经推断出类型UtlThreadSafeQueue
。要获得完美转发的语义,您将需要如下所示的push
:template<typename U>
inline void push(U&& value) { ... }
正如您所期望的,此版本的
push
接受任何类型的引用。但是,它的用途是将所有参数一起转发到适当的构造函数/函数/等。由于希望维护内部std::queue<std::shared_ptr<BaseMessage>>
,因此可以有一个push
接受对派生类型BaseMessage
的引用(左值或右值),并将std::shared_ptr<DerivedType>
放入队列中。这将建立指向基础的指针(std::shared_ptr<BaseMessage> base_ptr = derived_ptr
,其中derived_ptr
是std::shared_ptr<DerivedMessage>
类型)。这可以通过以下步骤完成:template<typename U>
inline
std::enable_if_t<std::is_base_of<T,std::decay_t<U>>::value> push(U&& value)
{
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::make_shared<std::decay_t<U>> (std::forward<U>(value)));
data_cond.notify_one();
}
使用
std::enable_if_t
可确保仅将衍生自BaseMessage
的类型传递给push
函数。放入std::make_shared<std::decay_t<U>> (std::forward<U>(value))
队列将调用std::shared_ptr
的第9个构造函数(如所示)。这样做的好处是,它允许您和您的用户编写如下代码:
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ(10);
StringMessage sm("Hi there!");
IntegerMessage im(4242);
dataLoadSessionQ.push(sm);
dataLoadSessionQ.push(im);
而且它的行为符合预期。每个消息都由lvalue-ref传入,并通过调用派生类型的copy-ctor生成
shared_ptr
。您正在公开一个
push
接口,该接口不仅接受std::shared_ptr
,而且接受具有某些微妙之处的std::shared_ptr&&
。乍一看,似乎我无法做到这一点(从您的Coliru链接借用
StringMessage
和BaseMessage
类型): UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
auto my_message = std::make_shared<StringMessage>("Another message!");
dataLoadSessionQ.push(my_message);
尽管事实上
push
被定义为对shared_ptr
进行r值引用,但此代码通过传递my_message
进行编译(这不是r值引用!)。起初我还不清楚原因。但是,事实证明,类似于static_cast
,为static_pointer_cast
定义了一个shared_ptr
,它看起来像下面的内容(从Here借来的):template< class T, class U >
std::shared_ptr<T> static_pointer_cast( const std::shared_ptr<U>& r );
如果转换成功,它将执行从
shared_ptr<U>
到shared_ptr<T>
的转换。因为您的Coliru示例在内部使用了std::queue<std::shared_ptr<BaseMessage>>
,并且您试图推送shared_ptr<StringMessage>
,所以由于shared_ptr<BaseMessage>
继承自StringMessage
,因此隐式转换为BaseMessage
成功。转换返回一个别名构造的std::shared_ptr<BaseMessage>
,它将很高兴地绑定到右值引用。请注意,如果您改为尝试以下操作:
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
auto generic_message = std::make_shared<BaseMessage>();
dataLoadSessionQ.push(generic_message);
您收到了我们(或者也许只是我)最初期望的编译器错误
错误:无法将“ std :: shared_ptr”左值绑定到“ std :: shared_ptr &&”
dataLoadSessionQ.push(generic_message);
老实说,无论从性能还是美学方面,我都找不到必须将
shared_ptr<Derived>
传递给UtlThreadSafeQueue<Base>
的充分理由。我希望能够同时传递一个临时Derived
和一个左值,而不会对队列的内部感到烦恼。您还可以通过
wait_and_pop()
从try_pop()
的值中大写返回shared_ptr
的std::move
/ data_queue.front()
(因为在下次调用data_queue.pop()
时无论如何都会破坏它)在您的
UtlThreadSafeQueue
构造函数中,我还将考虑将const std::size_t&
更改为按值std::size_t
,对于(示例)IntegerMessage
类型也是如此。考虑到这一点,对于上面强调的更改,我将不胜感激-坦率地说,直到很晚以后您发布更多示例并继续编辑问题时,我才能理解您的目标/实现。
关于c++ - 无法将元素添加到共享指针的线程安全锁定队列中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30586932/