c - fifo 循环队列中的 pthread_cond_wait 死锁

标签 c multithreading algorithm thread-safety pthreads

我的代码只在一个生产者一个消费者的情况下使用。

这是我的测试代码:

static void *afunc(void * arg) {
    Queue* q  = arg;
    for(int i= 0; i< 100000; i++) {
        *queue_pull(q) = i; //get one element space
        queue_push(q);      //increase the write pointer
    }
    return NULL;
}
static void *bfunc(void * arg) {
    Queue* q  = arg;
    for(;;) {
        int *i = queue_fetch(q); //get the first element in queue
        printf("%d\n", *i);
        queue_pop(q);   //increase the read pointer
    }
}
int main() {
    Queue queue;
    pthread_t a, b;
    queue_init(&queue);
    pthread_create(&a, NULL, afunc, &queue);
    pthread_create(&b, NULL, bfunc, &queue);

    sleep(100000);
    return 0;
}

这里是循环队列的实现

#define MAX_QUEUE_SIZE 3
typedef struct Queue{ 
    int data[MAX_QUEUE_SIZE] ; 
    int read,write; 
    pthread_mutex_t mutex, mutex2; 
    pthread_cond_t not_empty, not_full; 
}Queue; 
int queue_init(Queue *queue) { 
    memset(queue, 0, sizeof(Queue)); 
    pthread_mutex_init(&queue->mutex, NULL); 
    pthread_cond_init(&queue->not_empty, NULL); 
    pthread_mutex_init(&queue->mutex2, NULL); 
    pthread_cond_init(&queue->not_full, NULL); 
    return 0; 
} 
int* queue_fetch(Queue *queue) { 
    int* ret; 
    if (queue->read == queue->write) {
        pthread_mutex_lock(&queue->mutex); 
        pthread_cond_wait(&queue->not_empty, &queue->mutex); 
        pthread_mutex_unlock(&queue->mutex); 
    } 
    ret = &(queue->data[queue->read]); 
    return ret; 
} 
void queue_pop(Queue *queue) { 
    nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE); 
    pthread_cond_signal(&queue->not_full); 
} 
int*  queue_pull(Queue *queue) { 
    int* ret; 
    if ((queue->write+1)%MAX_QUEUE_SIZE == queue->read) { 
        pthread_mutex_lock(&queue->mutex2); 
        pthread_cond_wait(&queue->not_full, &queue->mutex2); 
        pthread_mutex_unlock(&queue->mutex2); 
    } 
    ret = &(queue->data[queue->write]); 
    return ret; 
} 
void queue_push(Queue *queue) { 
        nx_atomic_set(queue->write, (queue->write+1)%MAX_QUEUE_SIZE); 
        pthread_cond_signal(&queue->not_empty); 
} 

片刻之后,两个子线程似乎会变成死锁..

编辑:我用了两个信号量,但它也有一些问题..它很漂亮 很奇怪,如果只执行 ./main,似乎没问题,但如果我重定向到一个文件,比如 ./main > a.txt,然后 wc -l a.txt,结果不等于排队数..

int queue_init(Queue *queue) {
    memset(queue, 0, sizeof(Queue));
    pthread_mutex_init(&queue->mutex, NULL);
    sem_unlink("/not_empty");
    queue->not_empty = sem_open("/not_empty", O_CREAT, 644, 0);
    sem_unlink("/not_full");
    queue->not_full = sem_open("/not_full", O_CREAT, 644, MAX_QUEUE_SIZE);
    return 0;
}

int* queue_fetch(Queue *queue) {
    sem_wait(queue->not_empty);
    return &(queue->data[queue->read]);
}
void queue_pop(Queue *queue) {
    nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE);
    sem_post(queue->not_full);
}

int* queue_pull(Queue *queue) {
    sem_wait(queue->not_full);
    return  &(queue->data[queue->write]);
}
void queue_push(Queue *queue) {
    nx_atomic_set(queue->write, (queue->write+1)%MAX_QUEUE_SIZE);
    sem_post(queue->not_empty);
}

最佳答案

您在互斥量之外操纵队列的状态,这本质上是有争议的。

我建议使用单个互斥体,但只要您更改或测试读写指标,就使用它。这也意味着您不需要原子集。

关于c - fifo 循环队列中的 pthread_cond_wait 死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26311397/

相关文章:

algorithm - 一个循环的运行时间直到 i*i <= n

c - c文件中外部变量的使用

c - volatile 和缓存行为

c++ - 为什么需要浮点指针或整型指针分别指向浮点型变量或整型变量?

java - 使用异步拉取持续从 Google PubSub 接收消息

c++ - 在多线程程序中使用多个async_wait

java - 具有 O(log n) 删除任意节点的优先级队列(或最小堆)

python - 基于使用 Python 的 Lempel-Ziv 算法的熵估计器

c - 从 C 中的共享库访问主程序

使用命令行参数在 C 程序中创建多个文件