c++ - 公平队列丢失通知

标签 c++ multithreading stl notifications thread-safety

考虑以下代码

#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class Tqueue 
{
public:

   Tqueue() : m_next_ticket(0),
              m_counter(0) {}

   void push(const T& e){
       std::unique_lock<std::mutex> lock(m_mutex);
       m_queue.push(e);
       lock.unlock();
       m_cond.notify_all();
    };

   T wait_and_pop() {
       std::unique_lock<std::mutex> lock(m_mutex);
       int ticket = m_next_ticket++;
       m_cond.wait(lock,[=]{return (!m_queue.empty())
                  && (ticket == m_counter);});
       m_counter++;
       T data = m_queue.front();
       m_queue.pop();
       return data;
   }

private:
   int m_next_ticket;
   int m_counter;
   std::queue<T> m_queue;
   std::mutex m_mutex;
   std::condition_variable m_cond;   
};

这应该是我想出的公平队列的模板。在这种情况下,公平意味着 wait_and_pop() 调用以不同线程调用它们的相同顺序返回。

例如: 线程 1 在空队列上调用 wait_and_pop() 并阻塞。然后,线程 2 在空队列上调用 wait_and_pop() 并阻塞。然后线程3用push()推送两个事件。现在线程 1 应该在线程 2 之前返回。

使用以下代码,有时它可以工作。但大多数时候 代码永远阻塞:

Tqueue<int> queue;

std::mutex mutex;

void test(int i) 
{
    auto bla = queue.wait_and_pop();
    std::cout << "Thread : "<<bla << std::endl;
}

const int SIZE = 200;

int main(int argc, char *argv[])
{
    std::vector<std::thread> threads;
    for(int i = 0; i < SIZE; ++i)
       threads.push_back(std::thread(test,i));
    for(int i = 0; i < SIZE; ++i)
        queue.push(i);
    for(int i = 0; i < SIZE; ++i)
       threads[i].join();
    return 0;
}

这个想法是为每个线程创建一个唯一的票证。使用条件变量,我们在 wait_and_pop() 函数中等待,直到 插入一个新事件。在 Push() 函数中,新事件被插入队列中,并通知所有等待线程。每个线程检查是否 如果唯一票证等于当前计数器,则队列不再为空。如果是,则特定线程离开条件循环, 从队列中弹出当前事件并增加计数器。

我怀疑,一些通知丢失了,但我无法理解这个事实,为什么会发生这种情况。有什么想法可以解决这个问题或者如何以正确的方式实现这个问题吗?

编辑 我从队列中更改了代码,如下所示。现在看来有效了。 重要的是,我通知,同时仍然保持锁定(在push()以及wait_and_pop()中)。此外,我将票证系统更改为线程 ID 队列,但这只是一种方便,可以保持源代码紧凑。但我不确定,如果我想使用队列 生产代码,因为我不明白为什么它现在有效,而且我不知道它是否适用于所有情况。也许有人可以对此发表评论?

template <typename T>
class Tqueue 
{
public:
   void push(const T& e){
       std::unique_lock<std::mutex> lock(m_mutex);
       m_queue.push(e);
       m_cond.notify_all();
   };

   T wait_and_pop() {
       std::unique_lock<std::mutex> lock(m_mutex);
       m_ids.push(std::this_thread::get_id());
       m_cond.wait(lock,[=]{return (!m_queue.empty())
                  && (m_ids.front() == std::this_thread::get_id());});
       T data = m_queue.front();
       m_queue.pop();
       m_ids.pop();
       m_cond.notify_all();
       return data;
   }

private:
   std::queue<T> m_queue;
   std::queue<std::thread::id> m_ids;
   std::mutex m_mutex;
   std::condition_variable m_cond;

 };

最佳答案

通知确实丢失了。多次 push 可能会生成更少数量的被唤醒线程,因为当执行 m_cond.notify_all(); 时,它只会使等待线程 可运行,即准备运行。这些线程仍然必须等待轮到它们并获取 m_cond.wait 内的锁。

也有可能的是,在单个等待线程最终执行之前,主线程会多次获取互斥锁。这会导致通知匮乏。

为了使该机制发挥作用,您需要在情况受到影响时随时发出通知。您已通知 m_queue.push(e);,这会影响第一个条件 !m_queue.empty()。您还需要在 wait_and_pop 结束时进行通知,以处理第二个条件 ticket == m_counter

T wait_and_pop() {
   ....blah blah
   T data = m_queue.front();
   m_queue.pop();

   lock.unlock();
   m_cond.notify_all();
   return data;
}

注意:这里的有可能我的意思是“最终会有一个最终发生的线程调度”。我的意思并不是“我不确定”。

进一步说明: condition_variable.notify_all() 仅保证最终唤醒线程。它不保证 X 次调用会唤醒 X 次。另外,由于你的情况,减少为保证只通知一个线程,这才是根本原因。

关于在 wait_and_pop 中解锁之前或之后进行通知
无论您在 wait_and_pop 中释放锁之前还是之后通知,都应该没有任何区别。我指定的修改应与编辑中的修改相同。我一直在进行一些变化较小的测试(线程计数、等待 x 线程完成并再次推送),但结果相同。

关于c++ - 公平队列丢失通知,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32332926/

相关文章:

android - JNI 的 reinterpret_cast 的替代方案?

java - 并发修改异常问题

multithreading - SDL 和 C++11 线程

c++ - C++中读取转换字符的问题

c++ - 如何在 QToolButton QT C++ 中的文本旁边放置箭头图标

c++ - OpenGL 中的可视化问题(glOrtho 和投影)

c++ - 带释放的 std::vector<> 的自定义分配器?

STL - 我可以在 DriverKit 驱动程序中使用 STL 吗?

c++ - 如何使字符串 vector 在用户输入中保留空格?

java - ThreadPoolExecutor 挂起