c++ - 无法将元素添加到共享指针的线程安全锁定队列中

标签 c++ multithreading c++11 ipc shared-ptr

我正在尝试使用C ++ 11并发技术创建基于线程间消息的通信。 Anthony William的书“ Concurrency in Action”描述了此实现所基于的线程安全锁定队列。本书中描述的线程安全锁定队列与我要实现的线程安全锁定队列之间的区别是,首先,我使用通用引用将队列元素转发到阻塞队列,其次(这就是问题所在)我需要能够存储std :: shared_ptr指针队列,因为模板类型包括具有抽象基类的简单消息类层次结构和具有实际专用消息的子类。我需要使用共享指针以避免数据切片。

编辑:我添加了coliru演示以更清楚地显示我的问题。
Live Demo

编辑1:对coliru live demo的更多更新以及其他编译器错误:
Coliru Demo With Compiler Errors

编辑2:感谢亚历杭德罗,我有一个可行的解决方案Working Coliru Example

为此,我将Anthony William对底层消息队列的实现从以下更改:

std::queue<T> data_queue


到一个

std::queue<std::shared_ptr<T>> data_queue


但是,当我尝试通过通用参考完美转发签名将消息指针推入队列时,我遇到了各种各样的错误。

我希望能够在此队列上添加消息的方式如下:

UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);



  使用上面的代码,编译器抱怨指出了某些东西
  以下几行错误C2664:'无效
  UtlThreadSafeQueue :: push(T &&)':无法转换
  参数1从'std :: shared_ptr'到
  T = BaseMessageType的“ BaseMessageType &&”


我想我需要某种方式来专门化指针类型,但是我不确定。

我的实现如下:

/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/

#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_

// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>

// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
    // Windows uses min/max macros
    #undef min
    #undef max
#endif

// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS

template<typename T>
class UtlThreadSafeQueue {
private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T>> data_queue;
    std::condition_variable data_cond;
    std::size_t capacity;
    std::atomic<bool> shutdownFlag;
public:
    explicit UtlThreadSafeQueue(const size_t& rCapacity =
        std::numeric_limits<std::size_t>::max())
        : mut()
        , data_queue()
        , data_cond()
        , capacity(rCapacity)
        , shutdownFlag(false)
    {}

    UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
        std::lock_guard<std::mutex> lock(rhs.mut);
        data_queue = rhs.data_queue;
    }

    virtual ~UtlThreadSafeQueue() = default;

    // move aware push
    inline void push(T&& value) {
        std::unique_lock<std::mutex> lock(mut);
        // only add the value on the stack if there is room
        data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
        data_queue.emplace(std::forward<T>(value));
        data_cond.notify_one();
    }

    // wait for non empty lambda condition before returning value
    inline void wait_and_pop(T& rValue) {
        std::unique_lock<std::mutex> lock(mut);
        data_cond.wait(lock,[this]{return !data_queue.empty();});
        // ideally should return an invalid value
        if (!shutdownFlag) {
            rValue = data_queue.front();
            data_queue.pop();
        }
    }

    // wait for non empty lambda condition before returning shared pointer to value
    inline std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lock(mut);
        data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
        if (shutdownFlag) {
            std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
            data_queue.pop();
            return res;
        }
        return nullptr;
    }

    // return value in specified reference and flag indicating whether value
    // successfully returned or not
    inline bool try_pop(T& rValue) {
        std::lock_guard<std::mutex> lock(mut);
        if (data_queue.empty()) {
            return false;
        }
        rValue = data_queue.front();
        data_queue.pop();
        return true;
    }

    // return shared pointer to value - which if set to nullptr,
    // indicates container was empty at the time of the call.
    inline std::shared_ptr<T> try_pop() {
        std::lock_guard<std::mutex> lock(mut);
        if (data_queue.empty()) {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }
    // thread safe method to check if the queue is empty
    // note that if it is empty
    inline bool empty() const {
        std::lock_guard<std::mutex> lock(mut);
        return data_queue.empty();
    }

    // shutdown support - wake up potentially sleeping queues
    inline void shutdown() {
        shutdownFlag = true;
        data_cond.notify_all();
    }
};

#endif // _utlThreadSafeQueue_h_

最佳答案

在评论中进行了广泛的讨论之后,按照Coliru的链接,我认为我了解您最初尝试做的事情,并且我也想为您的数据结构提供一些建议。


您提到push()函数可以移动。优秀!不过要小心。

如果您查看如何定义push函数,

inline void push(T&& value)


我想在这里指出几点。首先是,这将仅绑定到r值引用,而不绑定到通用引用(或forwarding references,因为它们将很快被称为)。在std::forward中使用push是不合适的操作(尽管从技术上讲是正确的)。原因是在类级别(实例化T时)已经推断出类型UtlThreadSafeQueue。要获得完美转发的语义,您将需要如下所示的push

template<typename U>
inline void push(U&& value) { ... }


正如您所期望的,此版本的push接受任何类型的引用。但是,它的用途是将所有参数一起转发到适当的构造函数/函数/等。由于希望维护内部std::queue<std::shared_ptr<BaseMessage>>,因此可以有一个push接受对派生类型BaseMessage的引用(左值或右值),并将std::shared_ptr<DerivedType>放入队列中。这将建立指向基础的指针(std::shared_ptr<BaseMessage> base_ptr = derived_ptr,其中derived_ptrstd::shared_ptr<DerivedMessage>类型)。这可以通过以下步骤完成:

template<typename U>
inline
std::enable_if_t<std::is_base_of<T,std::decay_t<U>>::value> push(U&& value)   
{

  std::unique_lock<std::mutex> lock(mut);
  // only add the value on the stack if there is room
  data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
  data_queue.emplace(std::make_shared<std::decay_t<U>> (std::forward<U>(value)));
  data_cond.notify_one();
}


使用std::enable_if_t可确保仅将衍生自BaseMessage的类型传递给push函数。放入std::make_shared<std::decay_t<U>> (std::forward<U>(value))队列将调用std::shared_ptr的第9个构造函数(如所示)。

这样做的好处是,它允许您和您的用户编写如下代码:

UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ(10);
StringMessage sm("Hi there!");
IntegerMessage im(4242);
dataLoadSessionQ.push(sm);
dataLoadSessionQ.push(im);


而且它的行为符合预期。每个消息都由lvalue-ref传入,并通过调用派生类型的copy-ctor生成shared_ptr
您正在公开一个push接口,该接口不仅接受std::shared_ptr,而且接受具有某些微妙之处的std::shared_ptr&&

乍一看,似乎我无法做到这一点(从您的Coliru链接借用StringMessageBaseMessage类型):

  UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
  auto my_message = std::make_shared<StringMessage>("Another message!");
  dataLoadSessionQ.push(my_message);


尽管事实上push被定义为对shared_ptr进行r值引用,但此代码通过传递my_message进行编译(这不是r值引用!)。起初我还不清楚原因。但是,事实证明,类似于static_cast,为static_pointer_cast定义了一个shared_ptr,它看起来像下面的内容(从Here借来的):

template< class T, class U > 
std::shared_ptr<T> static_pointer_cast( const std::shared_ptr<U>& r );


如果转换成功,它将执行从shared_ptr<U>shared_ptr<T>的转换。因为您的Coliru示例在内部使用了std::queue<std::shared_ptr<BaseMessage>>,并且您试图推送shared_ptr<StringMessage>,所以由于shared_ptr<BaseMessage>继承自StringMessage,因此隐式转换为BaseMessage成功。转换返回一个别名构造的std::shared_ptr<BaseMessage>,它将很高兴地绑定到右值引用。

请注意,如果您改为尝试以下操作:

UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
auto generic_message = std::make_shared<BaseMessage>(); 
dataLoadSessionQ.push(generic_message);


您收到了我们(或者也许只是我)最初期望的编译器错误



  错误:无法将“ std :: shared_ptr”左值绑定到“ std :: shared_ptr &&”
       dataLoadSessionQ.push(generic_message);


老实说,无论从性能还是美学方面,我都找不到必须将shared_ptr<Derived>传递给UtlThreadSafeQueue<Base>的充分理由。我希望能够同时传递一个临时Derived和一个左值,而不会对队列的内部感到烦恼。
      您还可以通过wait_and_pop()try_pop()的值中大写返回shared_ptrstd::move / data_queue.front()(因为在下次调用data_queue.pop()时无论如何都会破坏它)

在您的UtlThreadSafeQueue构造函数中,我还将考虑将const std::size_t&更改为按值std::size_t,对于(示例)IntegerMessage类型也是如此。

考虑到这一点,对于上面强调的更改,我将不胜感激-坦率地说,直到很晚以后您发布更多示例并继续编辑问题时,我才能理解您的目标/实现。

关于c++ - 无法将元素添加到共享指针的线程安全锁定队列中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30586932/

相关文章:

c++ - 从 bin 文件中读取十六进制数作为每个字符的 2 位数字-c++

c++ - 在 OpenGL 中将窗口坐标转换为轴坐标

c++ - 点或箭头运算符与范围解析运算符的比较,用于访问基础子对象

multithreading - 多个线程在缓冲区 : are there consistent arguments stating that it is dangerous? 上写入相同的数据

python - 为什么 __init__() 缺少参数?

C++ 用迭代器初始化 vector

java - 如何避免显示“应用程序无响应”对话框

c++ - 将方法作为模板参数传递 C++11

c++ - std::function 的类型推导

c++ - 是否有 auto_ptr 的替代品可以与 c++11 中的 boost ptr_map 一起使用