c - 我的消息队列在哪里产生段错误?

标签 c multithreading asynchronous mutex message-queue

当处理很多线程时,消息队列就会停止工作。例如,它似乎只适用于 10 个线程。 GDB 告诉我

Program received signal SIGSEGV, Segmentation fault.
__GI_____strtol_l_internal (nptr=0x0, endptr=endptr@entry=0x0, base=base@entry=10, group=group@entry=0, loc=0x7ffff78b0060 <_nl_global_locale>)
    at ../stdlib/strtol_l.c:298
298 ../stdlib/strtol_l.c: No such file or directory.

但是我不知道这意味着什么。相同的代码在 Windows 上工作正常,但在 Linux 上却不行,这让我更加困惑。

您可以在下面看到该队列的工作原理。它是一个单链表,在接收消息时会加锁。请帮我找出我搞砸的地方。

typedef struct Message {
    unsigned type;
    unsigned code;
    void *data;
} Message;

typedef struct MessageQueueElement {
    Message message;
    struct MessageQueueElement *next;
} MessageQueueElement;

typedef struct MessageQueue {
    MessageQueueElement *first;
    MessageQueueElement *last;
} MessageQueue;

MessageQueue mq;
pthread_mutex_t emptyLock, sendLock;
pthread_cond_t emptyCond;

void init() {
    mq.first = malloc(sizeof(MessageQueueElement));
    mq.last = mq.first;
    pthread_mutex_init(&emptyLock, NULL);
    pthread_mutex_init(&sendLock, NULL);
    pthread_cond_init(&emptyCond, NULL);
}

void clean() {
    free(mq.first);
    pthread_mutex_destroy(&emptyLock);
    pthread_mutex_destroy(&sendLock);
    pthread_cond_destroy(&emptyCond);
}

void sendMessage(MessageQueue *this, Message *message) {
    pthread_mutex_lock(&sendLock);
    if (this->first == this->last) {
        pthread_mutex_lock(&emptyLock);
        this->last->message = *message;
        this->last = this->last->next = malloc(sizeof(MessageQueueElement));
        pthread_cond_signal(&emptyCond);
        pthread_mutex_unlock(&emptyLock);
    } else {
        this->last->message = *message;
        this->last = this->last->next = malloc(sizeof(MessageQueueElement));
    }
    pthread_mutex_unlock(&sendLock);
}

int waitMessage(MessageQueue *this, int (*readMessage)(unsigned, unsigned, void *)) {
    pthread_mutex_lock(&emptyLock);
    if (this->first == this->last) {
        pthread_cond_wait(&emptyCond, &emptyLock);
    }
    pthread_mutex_unlock(&emptyLock);
    int n = readMessage(this->first->message.type, this->first->message.code, this->first->message.data);
    MessageQueueElement *temp = this->first;
    this->first = this->first->next;
    free(temp);
    return n;
}

一些测试代码:

#define EXIT_MESSAGE 0
#define THREAD_MESSAGE 1
#define JUST_A_MESSAGE 2
#define EXIT 0
#define CONTINUE 1

int readMessage(unsigned type, unsigned code, void *data) {
    if (type == THREAD_MESSAGE) {
        printf("message from thread %d: %s\n", code, (char *)data);
        free(data);
    } else if (type == JUST_A_MESSAGE) {
        puts((char *)data);
        free(data);
    } else if (type == EXIT_MESSAGE) {
        puts("ending the program");
        return EXIT;
    }
    return CONTINUE;
}

int nThreads;
int counter = 0;

void *worker(void *p) {
    double pi = 0.0;
    for (int i = 0; i < 1000000; i += 1) {
        pi += (4.0 / (8.0 * i + 1.0) - 2.0 / (8.0 * i + 4.0) - 1.0 / (8.0 * i + 5.0) - 1.0 / (8.0 * i + 6.0)) / pow(16.0, i);
    }
    char *s = malloc(100);
    sprintf(s, "pi equals %.8f", pi);
    sendMessage(&mq, &(Message){.type = THREAD_MESSAGE, .code = (int)(intptr_t)p, .data = s});
    counter += 1;
    char *s2 = malloc(100);
    sprintf(s2, "received %d message%s", counter, counter == 1 ? "" : "s");
    sendMessage(&mq, &(Message){.type = JUST_A_MESSAGE, .data = s2});
    if (counter == nThreads) {
        sendMessage(&mq, &(Message){.type = EXIT_MESSAGE});
    }
}

int main(int argc, char **argv) {
    clock_t timer = clock();
    init();
    nThreads = atoi(argv[1]);

    pthread_t threads[nThreads];
    for (int i = 0; i < nThreads; i += 1) {
        pthread_create(&threads[i], NULL, worker, (void *)(intptr_t)i);
    }
    while (waitMessage(&mq, readMessage));
    for (int i = 0; i < nThreads; i += 1) {
        pthread_join(threads[i], NULL);
    }
    clean();
    timer = clock() - timer;
    printf("%.2f\n", (double)timer / CLOCKS_PER_SEC);
    return 0;
}

--- 编辑 --- 好吧,我通过使用信号量稍微改变程序来解决这个问题。 waitMessage 函数不必锁定,因为它仅由一个线程访问,并且它修改的值不会与 sendMessage 冲突。

MessageQueue mq;
pthread_mutex_t mutex;
sem_t sem;

void init() {
    mq.first = malloc(sizeof(MessageQueueElement));
    mq.last = mq.first;
    pthread_mutex_init(&mutex, NULL);
    sem_init(&sem, 0, 0);
}

void clean() {
    free(mq.first);
    pthread_mutex_destroy(&mutex);
    sem_destroy(&sem);
}

void sendMessage(MessageQueue *this, Message *message) {
    pthread_mutex_lock(&mutex);
    this->last->message = *message;
    this->last = this->last->next = malloc(sizeof(MessageQueueElement));
    pthread_mutex_unlock(&mutex);
    sem_post(&sem);
}

int waitMessage(MessageQueue *this, int (*readMessage)(unsigned, unsigned, void *)) {
    sem_wait(&sem);
    int n = readMessage(this->first->message.type, this->first->message.code, this->first->message.data);
    MessageQueueElement *temp = this->first;
    this->first = this->first->next;
    free(temp);
    return n;
}

最佳答案

您的 waitMessage 函数正在修改 this->first 之外的任何锁定。这是一件坏事。

通常不值得重新创建操作系统已经为您提供的东西。您实际上正在尝试设置消息结构的管道。您可以简单地使用匿名管道(请参阅 here(适用于 Linux)或 here(适用于 Windows))并向其写入/读取消息结构。还有POSIX message queues这可能更有效率一点。

在使用多个工作线程的情况下,您必须有一个补充互斥信号量来控制哪个工作线程尝试从管道或消息队列中读取。

关于c - 我的消息队列在哪里产生段错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29552888/

相关文章:

javascript - 在继续之前等待对象构造函数内的方法完成

javascript - 无法操作 AJAX load() 加载的外部页面元素的 DOM

c - 打印宏的名称和值

android - 嵌套 ALOGD 语句

c# - BackgroundWorker 和线程

java - 在 Java 中实现这种线程/事件行为的最佳方式是什么?

javascript - vue.js - v-for 中动态生成的组件未正确更新绑定(bind)属性

c - strpbrk 函数 - C

使用 memcpy 将动态结构数组的内容复制到另一个动态数组中

c - 在 C 中使用 PThreads 合并排序