c - 多线程程序消费者导致程序卡住

标签 c multithreading mutex

我编写了多线程程序,生产者工作正常。我知道这一点是因为我禁用了消费者并且生产者给出了预期的输出。生产者被设计为用随机值 1 - 10 填充共享缓冲区,并且生产者轮流填充缓冲区。缓冲区中的所有元素最初都设置为 -1,向生产者表明它们是空的。然后,生产者应该向消费者发出信号,表明它已填充了缓冲区的一个元素,并开始消费它。当我添加消费者时,程序会填充一些元素,消耗 1,然后卡住。

这是我在添加了消费者后收到的输出。程序信号一次,产生几行,然后卡住。正如我所说,当消费者没有按预期添加到生产者中时。我已经研究这个问题有一段时间了,但我无法弄清楚。我认为问题在于消费者。

Producer thread 976 and value: 3
Producer thread 231312 and value: 3
Signal
Producer thread 976 and value: 3
Producer thread 231312 and value: 3
buff[1] = 3 and thread = 1232
Producer thread 976 and value: 2
Producer thread 231312 and value: 2
Producer thread 297568 and value: 3
Producer thread 298080 and value: 3
Producer thread 298592 and value: 3

对于此类节目,消费者应该如何看待?

这是程序

#include <stdio.h>
    #include <string.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <stdint.h>

    #define THREADS 5
    #define ELEMENTS 120



    int value = 0;
    int saveValue = 0;
    void *produce(void *arg);
    void *consume(void *arg);
    int producerCount =0;
    int consumerCount = ELEMENTS;

    struct {
      pthread_mutex_t mutex;
      int index;
      int value; 
      int MyShBuff[ELEMENTS];
    } add = {PTHREAD_MUTEX_INITIALIZER, 0, 0}; 


      struct{
       pthread_mutex_t    mutex;
       pthread_cond_t     cond;
       int nready;
       int value;
       int empty;
       int counter;

       /* number ready for consumer */
    } nready = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,0, -2, ELEMENTS};

    int main()
    {
        pthread_t tid_producer[THREADS], tid_consumer[THREADS];
        int i, j, k;

        //Ready buffer for producers
        for (i =0; i < ELEMENTS; i++)
        {
            add.MyShBuff[i]=-1;
        }



        for(j = 0; j < THREADS; j++) {

       pthread_create(&tid_producer[j], NULL, &produce, NULL);
       pthread_create(&tid_consumer[j], NULL, &consume, NULL);
    }



     /* wait for all producers and the consumer*/

        for(k = 0; k < THREADS; k++) {
            pthread_join(tid_producer[k], NULL);
            pthread_join(tid_consumer[k], NULL);    
        }


        exit(0);    
        return 0;
    }

    void *produce(void *arg)
    { 
    int i = 0;


    for ( ; ; ) 
    {
        pthread_mutex_lock(&add.mutex);
        if(add.index  >= ELEMENTS)
        {
            pthread_mutex_unlock(&add.mutex);
            return NULL;
        }
        if(add.MyShBuff[add.index] == -1)
        {
         add.value = rand() % 10 + 0;   
         add.MyShBuff[add.index] = add.value;
         printf("Producer thread %d and value: %d\n" ,pthread_self(), add.MyShBuff[add.index]);
         add.index++;
        }
         pthread_mutex_unlock(&add.mutex);
         pthread_mutex_lock(&nready.mutex);
         if(nready.nready == 0)
         {
         pthread_cond_signal(&nready.cond);
         printf("Signal\n");
         }

        nready.nready++;
        pthread_mutex_unlock(&nready.mutex);

        }


    }


    void *consume(void *arg)
    {
     int i;

      while(nready.empty != 0)
      {
       pthread_mutex_lock(&nready.mutex);
        while (nready.nready == 0)
        {
           pthread_cond_wait(&nready.cond,&nready.mutex);
            nready.nready--;
             nready.empty--;
             nready.counter++;
             pthread_mutex_unlock(&nready.mutex);

            pthread_mutex_lock(&add.mutex);
             printf("buff[%d] = %d and thread = %d\n", nready.counter, add.MyShBuff[nready.counter], pthread_self());
             add.MyShBuff[nready.counter] = -2;
             pthread_mutex_unlock(&add.mutex);



        }

      }

            return NULL;

    }

我想我已经做出了建议的更改。这是经过编辑的程序。

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdint.h>

#define THREADS 5
#define ELEMENTS 120



int value = 0;
int saveValue = 0;
void *produce(void *arg);
void *consume(void *arg);
int producerCount =0;
int consumerCount = ELEMENTS;

struct {
  pthread_mutex_t mutex;
  int index;
  int value; 
  int MyShBuff[ELEMENTS];
} add = {PTHREAD_MUTEX_INITIALIZER, 0, 0}; 


  struct{
   pthread_mutex_t    mutex;
   pthread_cond_t     cond;
   int nready;
   int value;
   int empty;
   int counter;

   /* number ready for consumer */
} nready = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,0, -2, ELEMENTS};

int main()
{
    pthread_t tid_producer[THREADS], tid_consumer[THREADS];
    int i, j, k;

    //Ready buffer for producers
    for (i =0; i < ELEMENTS; i++)
    {
        add.MyShBuff[i]=-1;
    }



    for(j = 0; j < THREADS; j++) {

   pthread_create(&tid_producer[j], NULL, &produce, NULL);
   pthread_create(&tid_consumer[j], NULL, &consume, NULL);
}



 /* wait for all producers and the consumer*/

    for(k = 0; k < THREADS; k++) {
        pthread_join(tid_producer[k], NULL);
        pthread_join(tid_consumer[k], NULL);    
    }

     /* Clean up and exit */

  pthread_mutex_destroy(&nready.mutex);
  pthread_mutex_destroy(&add.mutex);
   pthread_cond_destroy(&nready.cond);
   pthread_exit(NULL);

    exit(0);    
    return 0;
}

 void *produce(void *arg)
    { 
    int i = 0;


    for ( ; ; ) 
    {
        pthread_mutex_lock(&add.mutex);
        if(add.index  >= ELEMENTS)
        {
            pthread_mutex_unlock(&add.mutex);
            return NULL;
        }
        if(add.MyShBuff[add.index] == -1)
        {
         add.value = rand() % 10 + 0;   
         add.MyShBuff[add.index] = add.value;
         printf("Producer thread %d and value: %d\n" ,pthread_self(), add.MyShBuff[add.index]);
         add.index++;
        }
         pthread_mutex_unlock(&add.mutex);
         pthread_mutex_lock(&nready.mutex);
         if(nready.nready == 0)
         {
         pthread_cond_broadcast(&nready.cond);
         printf("Signal\n");
         }

        nready.nready++;
        pthread_mutex_unlock(&nready.mutex);

        }


    }


void *consume(void *arg)
{
pthread_mutex_lock(&nready.mutex);

  while(nready.empty != 0)
  {

    while (nready.nready == 0)
    {
        pthread_cond_wait(&nready.cond,&nready.mutex);

         nready.nready--;
         nready.empty--;

         pthread_mutex_unlock(&nready.mutex);

        pthread_mutex_lock(&add.mutex);
         printf("buff[%d] = %d and thread = %d\n", nready.counter, add.MyShBuff[nready.counter], pthread_self());
         add.MyShBuff[nready.counter] = -2;
         pthread_mutex_unlock(&add.mutex);

        pthread_mutex_lock(&nready.mutex);
        nready.counter++;
        pthread_mutex_unlock(&nready.mutex);

    }
  }


        return NULL;

}

consumer() 的第二次编辑。

void *consume(void *arg)
{

pthread_mutex_lock(&nready.mutex);

  while(nready.empty != 0)
  {

    while (nready.nready == 0)
    {


        pthread_cond_wait(&nready.cond,&nready.mutex);


         pthread_mutex_lock(&add.mutex);
         printf("buff[%d] = %d and thread = %d\n", nready.counter, add.MyShBuff[nready.counter], pthread_self());
         add.MyShBuff[nready.counter] = -2;
         pthread_mutex_unlock(&add.mutex);

        nready.counter++;   
        nready.empty--;
        printf("Empty %d\n" , nready.empty);

    }

    nready.nready--;

  }


  return NULL;


}

最佳答案

您的consume()函数在使用共享变量和同步对象时至少存在四个问题:

  1. 在外while条件下,它读取共享变量 nready.empty在任何互斥体(或信号量)的保护之外。在其他地方,该变量的值被修改。因此,您的程序的行为是未定义的。

  2. 它锁定互斥体 nready.mutex 在内循环之外,但在内部解锁它(并且不会重新锁定它)。因此,如果内部循环在外部循环的给定迭代上迭代多次(这很可能会发生),那么

    • 在第二次及后续迭代中,该函数尝试访问并可能修改共享 nready 的其他成员。没有互斥锁保护的结构。

    • 在第二次及后续迭代中,函数尝试等待条件变量 nready.cond没有锁定您为此目的指定的互斥体 ( nready.mutex )

    • 在第二次及后续迭代中,该函数尝试解锁尚未锁定的互斥锁。

  3. 它似乎假设在所有线程中,对于通过条件变量发送的每个信号,它将恰好收到一个信号。这不是一个安全的假设,因为

    • 信号不排队。如果生产者线程在没有消费者等待时发送信号,则该信号将丢失。此外,

    • 可能会发生虚假唤醒,其中线程从 pthread_cond_wait() 返回没有对pthread_cond_signal()进行相应的调用。这些虽然不寻常,但确实会发生,您需要为此做好准备。

  4. 当没有更多工作需要执行时,您无法关闭使用者线程。实际处理最后一个工作单元的可能会退出,但所有其他工作单元可能会被卡住,无限期地等待条件变量。

<小时/>

条件变量之所以如此命名,是因为它们是围绕允许线程暂停操作直到某些条件成立的概念而设计的。等待条件变量的常规使用范例如下:

  1. 锁定关联的互斥体。
  2. (可选)执行一些工作。
  3. 检查简历的相关条件。对条件所基于的任何共享变量的访问必须在任何地方都受到与 CV 使用的相同互斥体的保护。
  4. 如果条件满足,则不等待继续进行(跳至步骤6);否则就开始等待简历
  5. 等待返回后,返回第 3 步。必须重新检查条件以确保其为真,以免它再次被其他线程设为假或从一开始就不是真的。
  6. (可选)执行一些工作。
  7. 解锁互斥体。

按照描述实现后,您可能还会发现您可以从依赖 pthread_cond_broadcast() 中受益。而不是pthread_cond_signal() 。特别是,这可能构成解决消费者关闭问题的方法的一部分(但不是全部)。

关于c - 多线程程序消费者导致程序卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46689015/

相关文章:

c++ - 在 Eclipse IDE 中设置 gcc ( gcc-arm-embedded ) for C/C++ Developers Mars in OS X

c - 从特定索引 C 读取 char*

c - 嵌套并行执行以使用 Cilk

multithreading - iOS 9 核心数据线程

python - 有没有办法将参数传递给 optuna 中的多个工作?

Release模式下的 C# 互斥量与 Debug模式下的行为不同

c - 如何使用 GNU Readline 处理 Ctrl+D

java - 为什么 Vert.x 被称为响应式,尽管它是单线程的

c - 程序间歇性地坚持 main 报告不同的线程 id 而不是线程本身

c++ - 互斥锁未解锁