c++ - 如何同步三个线程?

标签 c++ multithreading mutex synchronize

我的应用程序由主进程和两个线程组成,所有线程同时运行并使用三个 fifo 队列:

fifo-q 是 Qmain、Q1 和 Q2。在内部,每个队列都使用一个计数器,当将项目放入队列时该计数器会递增,而当从队列中“获取”项目时该计数器会递减。

处理涉及两个线程,
QMaster,从Q1和Q2中获取,并放入Qmain中,
监控器,投入第二季度,
以及主进程,从Qmain获取并放入Q1。

QMaster线程循环连续检查Q1和Q2的计数,如果q中有任何项目,它就会获取它们并将它们放入Qmain中。

Monitor-thread循环从外部获取数据,打包放入Q2。

应用程序的主进程还运行一个循环检查 Qmain 的计数,如果有任何项目,则获取一个项目 在循环的每次迭代中从 Qmain 中获取并进一步处理它。在此处理过程中,偶尔会出现 将一个项目放入 Q1 中以便稍后处理(当依次从 Qmain 获取时)。

问题:
我已经实现了上述所有内容,它会随机(短)运行一段时间,然后挂起。 我已经成功地确定了在增量/减量中发生崩溃的根源 fifo-q 的计数(可能发生在其中任何一个中)。

我尝试过的:
使用三个互斥锁:QMAIN_LOCK、Q1_LOCK 和 Q2_LOCK,每当任何获取/放置操作时我都会锁定它们 是在相关的 fifo-q 上完成的。结果:应用程序无法运行,只是挂起。

主进程必须始终继续运行,不得在“读取”时被阻止(命名管道失败、套接字对失败)。

有什么建议吗?
我认为我没有正确实现互斥体,应该如何完成?
(也欢迎任何改进上述设计的意见)

[编辑]下面是进程和 fifo-q-template:
我应该在哪里以及如何放置互斥体以避免上述问题?

main-process:
...
start thread QMaster
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    Q2.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        Q2.put(data)
    }
}

QMaster:
{
    while(1)
    {
        if (Q1.count() > 0)
            Qmain.put(Q1.get());

        if (Q2.count() > 0)
            Qmain.put(Q2.get());
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) { item i=new item(); (... adds to tail...); count++; }
    X* get() { X *d = h.data; (...deletes head ...); count--; return d; }
    clear() {...}
};

最佳答案

我如何调整设计并以 posix 方式锁定队列访问的示例。 请注意,我将包装互斥体以使用 RAII 或使用 boost-threading,并且我将使用 STL::deque 或 STL::queue 作为队列,但尽可能靠近您的代码:

main-process:
...
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    QMain.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        QMain.put(data)
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
    pthread_mutex_t m;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) 
    { 
      pthread_mutex_lock(&m);
      item i=new item(); 
      (... adds to tail...); 
      count++; 
      pthread_mutex_unlock(&m);
    }
    X* get() 
    { 
      pthread_mutex_lock(&m);
      X *d = h.data; 
      (...deletes head ...); 
      count--; 
      pthread_mutex_unlock(&m);
      return d; 
    }
    clear() {...}
};

还要注意,互斥体仍然需要初始化,如示例 here 中所示。并且 count() 也应该使用互斥体

关于c++ - 如何同步三个线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3931026/

相关文章:

c - Linux互斥检查程序是否已经运行?

c - 互斥线程 - 代码似乎无法正确退出

c++ - 错误: expected primary-expression before ‘double’

c++ - 创建多线程服务器的问题

java - 将线程用于 100 个独立的小型本地搜索作业是否是一种干净的方法?

multithreading - 我不明白饥饿是什么意思,正如作者所解释的那样

c# - .NET 中的线程同步

c++ - makefile 中的条件变量

c++ - 无法设置用户可以输入的数字范围

c - 厕所 key 示例中二进制信号量和互斥量之间的区别?