我的代码只在一个生产者一个消费者的情况下使用。
这是我的测试代码:
static void *afunc(void * arg) {
Queue* q = arg;
for(int i= 0; i< 100000; i++) {
*queue_pull(q) = i; //get one element space
queue_push(q); //increase the write pointer
}
return NULL;
}
static void *bfunc(void * arg) {
Queue* q = arg;
for(;;) {
int *i = queue_fetch(q); //get the first element in queue
printf("%d\n", *i);
queue_pop(q); //increase the read pointer
}
}
int main() {
Queue queue;
pthread_t a, b;
queue_init(&queue);
pthread_create(&a, NULL, afunc, &queue);
pthread_create(&b, NULL, bfunc, &queue);
sleep(100000);
return 0;
}
这里是循环队列的实现
#define MAX_QUEUE_SIZE 3
typedef struct Queue{
int data[MAX_QUEUE_SIZE] ;
int read,write;
pthread_mutex_t mutex, mutex2;
pthread_cond_t not_empty, not_full;
}Queue;
int queue_init(Queue *queue) {
memset(queue, 0, sizeof(Queue));
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->not_empty, NULL);
pthread_mutex_init(&queue->mutex2, NULL);
pthread_cond_init(&queue->not_full, NULL);
return 0;
}
int* queue_fetch(Queue *queue) {
int* ret;
if (queue->read == queue->write) {
pthread_mutex_lock(&queue->mutex);
pthread_cond_wait(&queue->not_empty, &queue->mutex);
pthread_mutex_unlock(&queue->mutex);
}
ret = &(queue->data[queue->read]);
return ret;
}
void queue_pop(Queue *queue) {
nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE);
pthread_cond_signal(&queue->not_full);
}
int* queue_pull(Queue *queue) {
int* ret;
if ((queue->write+1)%MAX_QUEUE_SIZE == queue->read) {
pthread_mutex_lock(&queue->mutex2);
pthread_cond_wait(&queue->not_full, &queue->mutex2);
pthread_mutex_unlock(&queue->mutex2);
}
ret = &(queue->data[queue->write]);
return ret;
}
void queue_push(Queue *queue) {
nx_atomic_set(queue->write, (queue->write+1)%MAX_QUEUE_SIZE);
pthread_cond_signal(&queue->not_empty);
}
片刻之后,两个子线程似乎会变成死锁..
编辑:我用了两个信号量,但它也有一些问题..它很漂亮 很奇怪,如果只执行 ./main,似乎没问题,但如果我重定向到一个文件,比如 ./main > a.txt,然后 wc -l a.txt,结果不等于排队数..
int queue_init(Queue *queue) {
memset(queue, 0, sizeof(Queue));
pthread_mutex_init(&queue->mutex, NULL);
sem_unlink("/not_empty");
queue->not_empty = sem_open("/not_empty", O_CREAT, 644, 0);
sem_unlink("/not_full");
queue->not_full = sem_open("/not_full", O_CREAT, 644, MAX_QUEUE_SIZE);
return 0;
}
int* queue_fetch(Queue *queue) {
sem_wait(queue->not_empty);
return &(queue->data[queue->read]);
}
void queue_pop(Queue *queue) {
nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE);
sem_post(queue->not_full);
}
int* queue_pull(Queue *queue) {
sem_wait(queue->not_full);
return &(queue->data[queue->write]);
}
void queue_push(Queue *queue) {
nx_atomic_set(queue->write, (queue->write+1)%MAX_QUEUE_SIZE);
sem_post(queue->not_empty);
}
最佳答案
您在互斥量之外操纵队列的状态,这本质上是有争议的。
我建议使用单个互斥体,但只要您更改或测试读写指标,就使用它。这也意味着您不需要原子集。
关于c - fifo 循环队列中的 pthread_cond_wait 死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26311397/