C 未初始化的互斥量有效而初始化的互斥量失败?

标签 c concurrency mutex

我的 C 程序创建了一个生产者线程,以尽可能快的速度保存数据。主线程消费并打印这些。

经过几天的 bug 发现,我注意到如果互斥体被初始化,那么程序会在 30 秒内停止(死锁?)。

但是,如果互斥量未初始化,它会完美地工作。

谁能解释一下??为了避免未定义的行为,我宁愿尽可能初始化它们。

更多信息:具体来说,如果“pthread_mutex_t signalM”(信号互斥)被初始化,它就会锁定

已初始化

#include <stdlib.h>                     // exit_failure, exit_success
#include <stdio.h>                      // stdin, stdout, printf
#include <pthread.h>                    // threads
#include <string.h>                     // string
#include <unistd.h>                     // sleep
#include <stdbool.h>                    // bool
#include <fcntl.h>                      // open



struct event {
    pthread_mutex_t critical;
    pthread_mutex_t signalM;
    pthread_cond_t signalC;
    int eventCount;
};

struct allVars {
    struct event inEvents;
    struct event outEvents;
    int bufferSize;
    char buffer[10][128];
};




/**
 * Advance the EventCount
 */
void advance(struct event *event) {
    // increment the event counter
    pthread_mutex_lock(&event->critical);
    event->eventCount++;
    pthread_mutex_unlock(&event->critical);

    // signal await to continue
    pthread_mutex_lock(&event->signalM);
    pthread_cond_signal(&event->signalC);
    pthread_mutex_unlock(&event->signalM);
}



/**
 * Wait for ticket and buffer availability
 */
void await(struct event *event, int ticket) {

    int eventCount;

    // get the counter
    pthread_mutex_lock(&event->critical);
    eventCount = event->eventCount;
    pthread_mutex_unlock(&event->critical);

    // lock signaling mutex
    pthread_mutex_lock(&event->signalM);

    // loop until the ticket machine shows your number
    while (ticket > eventCount) {
        // wait until a ticket is called
        pthread_cond_wait(&event->signalC, &event->signalM);

        // get the counter
        pthread_mutex_lock(&event->critical);
        eventCount = event->eventCount;
        pthread_mutex_unlock(&event->critical);
    }

    // unlock signaling mutex
    pthread_mutex_unlock(&event->signalM);
}



/**
 * Add to buffer
 */
void putBuffer(struct allVars *allVars, char data[]) {
    // get the current write position
    pthread_mutex_lock(&allVars->inEvents.critical);
    int in = allVars->inEvents.eventCount;
    pthread_mutex_unlock(&allVars->inEvents.critical);

    // wait until theres a space free in the buffer
    await(&allVars->outEvents, in - allVars->bufferSize + 1);   // set to 2 to keep 1 index distance

    // add data to buffer
    strcpy(allVars->buffer[in % allVars->bufferSize], data);

    // increment the eventCounter and signal
    advance(&allVars->inEvents);
}



/**
 * Get from buffer
 */
char *getBuffer(struct allVars *allVars) {
    // get the current read position
    pthread_mutex_lock(&allVars->outEvents.critical);
    int out = allVars->outEvents.eventCount;
    pthread_mutex_unlock(&allVars->outEvents.critical);

    // wait until theres something in the buffer
    await(&allVars->inEvents, out + 1);

    // allocate memory for returned string
    char *str = malloc(128);

    // get the buffer data
    strcpy(str, allVars->buffer[out % allVars->bufferSize]);

    // increment the eventCounter and signal
    advance(&allVars->outEvents);

    return str;
}



/** child thread (producer) */
void *childThread(void *allVars) {
    char str[10];
    int count = 0;

    while (true) {
        sprintf(str, "%d", count++);
        putBuffer(allVars, str);
    }

    pthread_exit(EXIT_SUCCESS);
}



int main(void) {
    // init structs
    struct event inEvents = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    };
    struct event outEvents = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    };
    struct allVars allVars = {
        inEvents,       // events
        outEvents,
        10,             // buffersize
        {"", {""}}      // buffer[][]
    };


    // create child thread (producer)
    pthread_t thread;
    if (pthread_create(&thread, NULL, childThread, &allVars)) {
        fprintf(stderr, "failed to create child thread");
        exit(EXIT_FAILURE);
    }


    // (consumer)
    while (true) {
        char *out = getBuffer(&allVars);
        printf("buf: %s\n", out);
        free(out);
    }


    return (EXIT_SUCCESS);
}

未初始化

#include <stdlib.h>                     // exit_failure, exit_success
#include <stdio.h>                      // stdin, stdout, printf
#include <pthread.h>                    // threads
#include <string.h>                     // string
#include <unistd.h>                     // sleep
#include <stdbool.h>                    // bool
#include <fcntl.h>                      // open



struct event {
    pthread_mutex_t critical;
    pthread_mutex_t signalM;
    pthread_cond_t signalC;
    int eventCount;
};

struct allVars {
    struct event inEvents;
    struct event outEvents;
    int bufferSize;
    char buffer[10][128];
};




/**
 * Advance the EventCount
 */
void advance(struct event *event) {
    // increment the event counter
    pthread_mutex_lock(&event->critical);
    event->eventCount++;
    pthread_mutex_unlock(&event->critical);

    // signal await to continue
    pthread_mutex_lock(&event->signalM);
    pthread_cond_signal(&event->signalC);
    pthread_mutex_unlock(&event->signalM);
}



/**
 * Wait for ticket and buffer availability
 */
void await(struct event *event, int ticket) {

    int eventCount;

    // get the counter
    pthread_mutex_lock(&event->critical);
    eventCount = event->eventCount;
    pthread_mutex_unlock(&event->critical);

    // lock signaling mutex
    pthread_mutex_lock(&event->signalM);

    // loop until the ticket machine shows your number
    while (ticket > eventCount) {
        // wait until a ticket is called
        pthread_cond_wait(&event->signalC, &event->signalM);

        // get the counter
        pthread_mutex_lock(&event->critical);
        eventCount = event->eventCount;
        pthread_mutex_unlock(&event->critical);
    }

    // unlock signaling mutex
    pthread_mutex_unlock(&event->signalM);
}



/**
 * Add to buffer
 */
void putBuffer(struct allVars *allVars, char data[]) {
    // get the current write position
    pthread_mutex_lock(&allVars->inEvents.critical);
    int in = allVars->inEvents.eventCount;
    pthread_mutex_unlock(&allVars->inEvents.critical);

    // wait until theres a space free in the buffer
    await(&allVars->outEvents, in - allVars->bufferSize + 1);   // set to 2 to keep 1 index distance

    // add data to buffer
    strcpy(allVars->buffer[in % allVars->bufferSize], data);

    // increment the eventCounter and signal
    advance(&allVars->inEvents);
}



/**
 * Get from buffer
 */
char *getBuffer(struct allVars *allVars) {
    // get the current read position
    pthread_mutex_lock(&allVars->outEvents.critical);
    int out = allVars->outEvents.eventCount;
    pthread_mutex_unlock(&allVars->outEvents.critical);

    // wait until theres something in the buffer
    await(&allVars->inEvents, out + 1);

    // allocate memory for returned string
    char *str = malloc(128);

    // get the buffer data
    strcpy(str, allVars->buffer[out % allVars->bufferSize]);

    // increment the eventCounter and signal
    advance(&allVars->outEvents);

    return str;
}



/** child thread (producer) */
void *childThread(void *allVars) {
    char str[10];
    int count = 0;

    while (true) {
        sprintf(str, "%d", count++);
        putBuffer(allVars, str);
    }

    pthread_exit(EXIT_SUCCESS);
}



int main(void) {
    // init structs
    struct event inEvents; /* = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    }; */
    struct event outEvents; /* = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    }; */

    struct allVars allVars = {
        inEvents,       // events
        outEvents,
        10,             // buffersize
        {"", {""}}      // buffer[][]
    };


    // create child thread (producer)
    pthread_t thread;
    if (pthread_create(&thread, NULL, childThread, &allVars)) {
        fprintf(stderr, "failed to create child thread");
        exit(EXIT_FAILURE);
    }


    // (consumer)
    while (true) {
        char *out = getBuffer(&allVars);
        printf("buf: %s\n", out);
        free(out);
    }


    return (EXIT_SUCCESS);
}

最佳答案

Jonathan 解释了为什么没有初始化互斥量的代码没有死锁(本质上是因为尝试使用未初始化的互斥量永远不会阻塞,它只会立即返回错误)。

在正确初始化互斥锁的程序版本中导致无限等待的问题是您没有正确使用条件变量。谓词表达式的检查和条件变量的等待必须相对于可能正在修改谓词的任何其他线程以原子方式完成。您的代码正在检查一个谓词,该谓词是另一个线程甚至无法访问的局部变量。您的代码将实际谓词读入关键部分内的局部变量,但随后释放用于读取谓词的互斥锁并获取不同的互斥锁以原子方式读取“假”谓词(无论如何都不能被其他线程修改)条件变量等待。

所以你有这样一种情况,可以修改实际谓词 event->eventCount 并且在等待线程读取谓词和阻塞条件时发出该修改的信号变量。

我认为以下内容可以解决您的僵局,但我还没有机会进行太多测试。更改本质上是从 struct event 中删除 signalM 互斥锁,并将对它的任何使用替换为 critical 互斥锁:

#include <stdlib.h>                     // exit_failure, exit_success
#include <stdio.h>                      // stdin, stdout, printf
#include <pthread.h>                    // threads
#include <string.h>                     // string
#include <unistd.h>                     // sleep
#include <stdbool.h>                    // bool
#include <fcntl.h>                      // open



struct event {
    pthread_mutex_t critical;
    pthread_cond_t signalC;
    int eventCount;
};

struct allVars {
    struct event inEvents;
    struct event outEvents;
    int bufferSize;
    char buffer[10][128];
};




/**
 * Advance the EventCount
 */
void advance(struct event *event) {
    // increment the event counter
    pthread_mutex_lock(&event->critical);
    event->eventCount++;
    pthread_mutex_unlock(&event->critical);

    // signal await to continue
    pthread_cond_signal(&event->signalC);
}



/**
 * Wait for ticket and buffer availability
 */
void await(struct event *event, int ticket) {


    // get the counter
    pthread_mutex_lock(&event->critical);

    // loop until the ticket machine shows your number
    while (ticket > event->eventCount) {
        // wait until a ticket is called
        pthread_cond_wait(&event->signalC, &event->critical);
    }

    // unlock signaling mutex
    pthread_mutex_unlock(&event->critical);
}



/**
 * Add to buffer
 */
void putBuffer(struct allVars *allVars, char data[]) {
    // get the current write position
    pthread_mutex_lock(&allVars->inEvents.critical);
    int in = allVars->inEvents.eventCount;
    pthread_mutex_unlock(&allVars->inEvents.critical);

    // wait until theres a space free in the buffer
    await(&allVars->outEvents, in - allVars->bufferSize + 1);   // set to 2 to keep 1 index distance

    // add data to buffer
    strcpy(allVars->buffer[in % allVars->bufferSize], data);

    // increment the eventCounter and signal
    advance(&allVars->inEvents);
}



/**
 * Get from buffer
 */
char *getBuffer(struct allVars *allVars) {
    // get the current read position
    pthread_mutex_lock(&allVars->outEvents.critical);
    int out = allVars->outEvents.eventCount;
    pthread_mutex_unlock(&allVars->outEvents.critical);

    // wait until theres something in the buffer
    await(&allVars->inEvents, out + 1);

    // allocate memory for returned string
    char *str = malloc(128);

    // get the buffer data
    strcpy(str, allVars->buffer[out % allVars->bufferSize]);

    // increment the eventCounter and signal
    advance(&allVars->outEvents);

    return str;
}



/** child thread (producer) */
void *childThread(void *allVars) {
    char str[10];
    int count = 0;

    while (true) {
        sprintf(str, "%d", count++);
        putBuffer(allVars, str);
    }

    pthread_exit(EXIT_SUCCESS);
}



int main(void) {
    // init structs
    struct event inEvents = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    };
    struct event outEvents = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    };
    struct allVars allVars = {
        inEvents,       // events
        outEvents,
        10,             // buffersize
        {"", {""}}      // buffer[][]
    };


    // create child thread (producer)
    pthread_t thread;
    if (pthread_create(&thread, NULL, childThread, &allVars)) {
        fprintf(stderr, "failed to create child thread");
        exit(EXIT_FAILURE);
    }


    // (consumer)
    while (true) {
        char *out = getBuffer(&allVars);
        printf("buf: %s\n", out);
        free(out);
    }


    return (EXIT_SUCCESS);
}

关于C 未初始化的互斥量有效而初始化的互斥量失败?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30554292/

相关文章:

c - 仅在大括号范围内缩进代码是否常见?

c - 多态结构的内存偏移错误

c# - 使用 C# 在两个或多个进程之间共享 int 值?

C++14 shared_timed_mutex VS C++11 互斥量

c++ - 如果我在函数内部调用另一个函数,我应该解锁吗?

C: 返回正确的值,但分配错误的值:-1.#IND000000000000

c - 该程序在我的电脑上运行良好,但当我将其提交到编码站点时,它说存在段错误。有错误请指出

java - 使用 ReentrantReadWriteLock 和 boolean 标志

java - 法线贴图上的双重锁定 - 这安全吗?

java - 从另一个类更新进度条