我正在做一个在 UNIX 和 C++ 中使用线程的项目。基本上有一个生产者线程和 5 个消费者线程。生产者线程随机将递增的数字添加到队列中,消费者线程轮询 q 试图将其删除。出于某种原因,我的 q.size() 一直为负值,我不知道为什么。
#include <queue>
#include <list>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
using namespace std;
#define NUM_CONSUMER_THREADS 5
#define NUM_PRODUCER_THREADS 1
#define BUFFER_SIZE 20
void *c_thread_function(void *arg);
void *p_thread_function(void *arg);
queue<int> q;
int produce(int cur)
{
int temp = cur + 1;
return temp;
}
void append(int num)
{
if ( q.size() < BUFFER_SIZE )
{
q.push(num);
}
}
int take()
{
int removed = q.front();
q.pop();
sleep(1);
return removed;
}
void consume(int num, int thread)
{
printf("%d consumed %d \n", thread, num);
}
int main()
{
int result;
pthread_t cthreads[NUM_CONSUMER_THREADS];
pthread_t pthreads[NUM_PRODUCER_THREADS];
void *thread_result;
// build an array of consumer threads
for(int num_of_cthreads = 0; num_of_cthreads < NUM_CONSUMER_THREADS; num_of_cthreads++)
{
result = pthread_create(&(cthreads[num_of_cthreads]), NULL, c_thread_function, (void *)num_of_cthreads);
if ( result != 0 )
{
perror( "Thread Creation Failed");
exit(EXIT_FAILURE);
}
//sleep(1);
}
// build an array of producer threads
for(int num_of_pthreads = 0; num_of_pthreads < NUM_PRODUCER_THREADS; num_of_pthreads++)
{
result = pthread_create(&(pthreads[num_of_pthreads]), NULL, p_thread_function, NULL);
if ( result != 0 )
{
perror( "Thread Creation Failed");
exit(EXIT_FAILURE);
}
//sleep(1);
}
printf("All threads created\n");
while ( true )
{
// do nothing
}
}
void *c_thread_function(void *arg)
{
int temp = (long)arg;
printf("Consumer thread %d created \n", temp);
while ( true )
{
while ( q.size() > 0 )
{
int w = take();
consume(w, temp);
printf(" q size is now %d \n", q.size());
}
}
}
void *p_thread_function(void *arg)
{
printf("Producer thread created \n");
int itemsAdded = 0;
int temp;
int sleepTime;
while ( true )
{
while ( q.size() < BUFFER_SIZE )
{
temp = produce(itemsAdded);
sleepTime = 1+(int)(9.0*rand()/(RAND_MAX+1.0));
sleep(sleepTime);
append(temp);
printf("Producer adds: %d \n", temp);
printf(" q size is now %d \n", q.size());
itemsAdded++;
}
}
}
输出:
Producer adds: 1
q size is now -1
0 consumed 1
q size is now -2
1 consumed 1
q size is now -3
3 consumed 1
q size is now -4
4 consumed 0
q size is now -5
0 consumed 0
最佳答案
你需要了解race conditions的概念和 mutual exclusion .您的 std::queue
对象是一个共享资源,这意味着有多个线程在其上运行——可能同时。这意味着您必须使用锁(称为互斥锁)来保护它,以便每次访问都是同步的。否则,您将遇到所谓的竞争条件,其中一个线程修改数据而另一个线程也在访问/修改数据,从而导致程序状态不一致或损坏。
为防止竞争条件,您需要在每次队列访问之前锁定一个pthread_mutex
对象。
首先,您需要创建一个互斥锁对象并对其进行初始化。
pthread_mutex mymutex;
pthread_mutex_init(&mymutex, 0);
您的应用程序代码应如下所示:
pthread_mutex_lock(&mymutex);
// Do something with queue
pthread_mutex_unlock(&mymutex);
当一个线程获得锁后,其他线程无法获得锁。试图获取已被另一个线程获取的锁的线程将简单地等待直到锁被释放。这同步对队列的访问,确保一次只有一个线程修改它。
关于C++ UNIX 线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4330127/