C++ 线程安全队列关闭

标签 c++ multithreading concurrency queue

我在 C++ 中使用此类进行生产者-消费者设置:

#pragma once

#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>

template <typename T> class SafeQueue
{
public:
    SafeQueue() :
    _shutdown(false)
    {

    }

    void Enqueue(T item)
    {
        std::unique_lock<std::mutex> lock(_queue_mutex);
        bool was_empty = _queue.empty();
        _queue.push(std::move(item));
        lock.unlock();

        if (was_empty)
            _condition_variable.notify_one();
    }

    bool Dequeue(T& item)
    {
        std::unique_lock<std::mutex> lock(_queue_mutex);

        while (!_shutdown && _queue.empty())
            _condition_variable.wait(lock);

        if(!_shutdown)
        {
            item = std::move(_queue.front());
            _queue.pop();

            return true;
        }

        return false;
    }

    bool IsEmpty()
    {
        std::lock_guard<std::mutex> lock(_queue_mutex);
        return _queue.empty();
    }

    void Shutdown()
    {
        _shutdown = true;
        _condition_variable.notify_all();
    }

private:
    std::mutex _queue_mutex;
    std::condition_variable _condition_variable;
    std::queue<T> _queue;
    std::atomic<bool> _shutdown;
};

我是这样使用它的:

class Producer
{
public:
    Producer() :
        _running(true),
        _t(std::bind(&Producer::ProduceThread, this))
    { }

    ~Producer()
    {
        _running = false;
        _incoming_packets.Shutdown();
        _t.join();
    }

    SafeQueue<Packet> _incoming_packets;

private:
    void ProduceThread()
    {
        while(_running)
        {
            Packet p = GetNewPacket();
            _incoming_packets.Enqueue(p);
        }
    }

    std::atomic<bool> _running;
    std::thread _t;
}

class Consumer
{
    Consumer(Producer* producer) :
        _producer(producer),
        _t(std::bind(&Consumer::WorkerThread, this))
    { }

    ~Consumer()
    {
        _t.join();
    }

private:
    void WorkerThread()
    {
        Packet p;

        while(producer->_incoming_packets.Dequeue(p))
            ProcessPacket(p);
    }

    std::thread _t;
    Producer* _producer;
}

这在大多数 时间都有效。但是偶尔当我删除生产者时(并导致它的解构函数调用 SafeQueue::Shutdown_t.join() 会永远阻塞。

我的猜测是问题出在这里(在 SafeQueue::Dequeue 中):

while (!_shutdown && _queue.empty())
        _condition_variable.wait(lock);

SafeQueue::Shutdown 从线程 #1 被调用,同时线程 #2 完成检查 _shutdown 但在执行 _condition_variable.wait(lock),因此它“错过”了 notify_all()。这会发生吗?

如果这是问题所在,解决它的最佳方法是什么?

最佳答案

由于 SafeQueue 对象归生产者所有,删除生产者会导致通知消费者和在〜生产者完成时从其下删除 SafeQueue 之间的竞争条件。

我建议让共享资源既不属于生产者也不属于消费者,而是作为对每个构造函数的引用传递。

改变生产者和消费者的构造函数;

Producer( SafeQueue<Packet> & queue ) :
    _running(false), _incoming_packets(queue) {}


Consumer( SafeQueue<Packet> & queue ) :
    _running(false), _incoming_packets(queue) {}

以这种方式使用您的实例;

SafeQueue<Packet> queue;
Producer producer(queue);  
Consumer consumer(queue);

...do stuff...

queue.shutdown();

这也解决了您在 Consumer 类中与 Producer 类紧密耦合的不良设计问题。

此外,在析构函数中终止和加入线程可能不是一个好主意,就像您对 ~Producer 所做的那样。最好为每个线程类添加一个 Shutdown() 方法,并显式调用它们;

producer.shutdown();
consumer.shutdown();
queue.shutdown();

关闭顺序并不重要,除非您担心在停止消费者时丢失仍在队列中的未处理数据包。

关于C++ 线程安全队列关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39512514/

相关文章:

c - 从两个不同索引处的 2 个不同线程写入 float 组是否安全?

java - 是否可以使用两个嵌套的同步块(synchronized block)来锁定数组的两个单元格以进行原子操作?

java - 我怎样才能重写这个主线程 - 工作线程同步

c++ - 为什么使用对象进行编程不被认为是程序性的?

c++ - 与 '#define xxx'(有值)相比, '#define xxx yyy'(没有值!)有什么作用?

multithreading - 停止线程交错输出

scala - Scala 中的函数并行性和惰性

scala - 为什么 Actor 收到的消息是无序的?

c++ - BOOST_CHECK_NO_THROW 如何打印异常信息

c++ - 如何用 `R CMD INSTALL` 和 `Makevars` 覆盖 `--configure-args` 的 `--configure-vars` 编译标志?