c - pthread_cond_wait 和 pthread_mutex_lock 的优先级?

标签 c linux multithreading

我有多个读取线程和一个写入线程。如果我在其中一个读取线程上锁定互斥锁并从中发送广播,是否保证互斥锁将被等待 pthread_cond_wait() 的写线程锁定,或者是否有可能另一个在 pthread_mutex_lock() 上等待的读取线程将锁定互斥?主要问题是 pthread_cond_wait() 优先于 pthread_mutex_lock() 吗?

如果没有,我怎样才能实现互斥锁将始终被 pthread_cond_broadcast() 上的写线程锁定?

例子

阅读线程:

pthread_mutex_lock(mutex);
pthread_cond_broadcast(cond);
pthread_mutex_unlock(mutex);

写线程:
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);

最佳答案

让我们假设两个线程,读和写,都到达 pthread_mutex_lock在同一时刻。因此,要么写线程获取 pthread_mutex_lock 上的互斥锁调用或读取线程。

如果是写线程,读线程将等待 pthread_mutex_lock .写,通过调用pthread_cond_wait发布 mutex并阻止 cond .它是原子地完成的。因此,当读取线程是 grantex 时 mutex ,我们可以确定读取等待 cond .所以,在cond上广播到达写线程,它不再等待 cond但是 - 仍在 pthread_cond_wait 的范围内- 尝试锁定 mutex (保持阅读线程)。播出后cond读取线程释放 mutex它去写线程。所以写线程终于从pthread_cond_wait退出了拥有 mutex锁定。记得稍后解锁。

如果是读线程,写线程将等待 pthread_mutex_lock ,读取将在 cond 上广播信号然后释放mutex .之后写线程获取mutexpthread_mutex_lock并立即在其中发布 pthread_cond_wait等待 cond (请注意,之前的 cond 广播对当前的 pthread_cond_wait 没有影响)。在读取线程的下一次迭代中,它获取了 mutex 的锁。 , 在 cond 上发送广播并解锁 mutex .这意味着写线程在 cond 上向前移动并获取 mutex 的锁.

它是否回答了您关于优先级的问题?

评论后更新。

假设我们有一个线程(我们将其命名为 A 以供将来引用)持有对 mutex 的锁。很少有其他人试图获得相同的锁。一旦第一个线程释放锁,就无法预测哪个线程将获得锁。而且,如果A线程有一个循环并尝试重新获取 mutex 上的锁,它有可能被授予此锁,其他线程将继续等待。添加 pthread_cond_wait在授予锁定的范围内不会更改任何内容。

让我引用 POSIX 规范的片段(参见 https://stackoverflow.com/a/9625267/2989411 以供引用):

These functions atomically release mutex and cause the calling thread to block on the condition variable cond; atomically here means "atomically with respect to access by another thread to the mutex and then the condition variable". That is, if another thread is able to acquire the mutex after the about-to-block thread has released it, then a subsequent call to pthread_cond_broadcast() or pthread_cond_signal() in that thread shall behave as if it were issued after the about-to-block thread has blocked.



这只是标准关于操作顺序的保证。授予其他线程锁的顺序是相当不可预测的,它会根据一些非常微妙的时间波动而变化。

对于只与互斥体相关的代码,请使用以下代码玩一下:

#define _GNU_SOURCE
#include <pthread.h>

#include <stdio.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *th(void *arg) {
    int i;
    char *s = arg;
    for (i = 0; i < 10; ++i) {
        pthread_mutex_lock(&mutex);
        printf("%s %d\n", s, i);
        //sleep(1);
        pthread_mutex_unlock(&mutex);
#if 0
        pthread_yield();
#endif
    }
    return NULL;
}

int main() {
    int i;
    for (i = 0; i < 10; ++i) {
        pthread_t t1, t2, t3;
        printf("================================\n");
        pthread_create(&t1, NULL, th, "t1");
        pthread_create(&t2, NULL, th, "     t2");
        pthread_create(&t3, NULL, th, "            t3");
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
    }
    return 0;
}

在一台机器(单 CPU)上,它总是显示从 t3 开始的整个循环,然后是 t2,最后是从 t1。在另一个(2 个内核)上,线程的顺序更加随机,但在将互斥锁授予其他线程之前,它几乎总是显示每个线程的整个循环。很少有这样的情况:
t1 8
t1 9
            t3 0
     t2 0
     t2 1
     [removed other t2 output]
     t2 8
     t2 9
            t3 1
            t3 2

启用 pthread_yield通过替换 #if 0#if 1并观察结果并检查输出。对我来说,它的工作方式是两个线程隔行显示它们的输出,然后第三个线程终于有机会工作了。添加另一个或多个线程。玩 sleep 等。它确认了随机行为。

如果您想进行一些实验,请编译并运行以下代码。这是单生产者-多消费者模型的一个例子。它可以用两个参数运行:第一个是消费者线程的数量,第二个是生产数据系列的长度。如果没有给出参数,则有一个消费者线程和 120 个要处理的项目。我还建议在标记为 /* play here */ 的地方使用 sleep/usleep :更改参数的值,完全删除 sleep ,将其移动 - 在适当的时候 - 到临界区或替换为 pthread_yield 并观察行为的变化。
#define _GNU_SOURCE
#include <assert.h>
#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

struct data_t {
    int seq;
    int payload;
    struct data_t *next;
};

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct data_t *first = NULL, *last = NULL;
int in_progress = 1;
int num_data = 120;

void push(int seq, int payload) {
    struct data_t *e;
    e = malloc(sizeof(struct data_t));
    e->seq = seq;
    e->payload = payload;
    e->next = NULL;
    if (last == NULL) {
        assert(first == NULL);
        first = last = e;
    } else {
        last->next = e;
        last = e;
    }
}

struct data_t pop() {
    struct data_t res = {0};
    if (first == NULL) {
        res.seq = -1;
    } else {
        res.seq = first->seq;
        res.payload = first->payload;
        first = first->next;
        if (first == NULL) {
            last = NULL;
        }
    }
    return res;
}

void *producer(void *arg __attribute__((unused))) {
    int i;
    printf("producer created\n");
    for (i = 0; i < num_data; ++i) {
        int val;
        sleep(1); /* play here */
        pthread_mutex_lock(&mutex);
        val = rand() / (INT_MAX / 1000);
        push(i, val);
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond);
        printf("prod %3d %3d signaled\n", i, val);
    }
    in_progress = 0;
    printf("prod end\n");
    pthread_cond_broadcast(&cond);
    printf("prod end signaled\n");
    return NULL;
}

void *consumer(void *arg) {
    char c_id[1024];
    int t_id = *(int *)arg;
    sprintf(c_id, "%*s c %02d", t_id % 10, "", t_id);
    printf("%s created\n", c_id);
    while (1) {
        struct data_t item;
        pthread_mutex_lock(&mutex);
        item = pop();
        while (item.seq == -1 && in_progress) {
            printf("%s waits for data\n", c_id);
            pthread_cond_wait(&cond, &mutex);
            printf("%s got signal\n", c_id);
            item = pop();
        }
        if (!in_progress && item.seq == -1) {
            printf("%s detected end of data.\n", c_id);
            pthread_mutex_unlock(&mutex);
            break;
        }
        pthread_mutex_unlock(&mutex);
        printf("%s processing %3d %3d\n", c_id, item.seq, item.payload);
        sleep(item.payload % 10); /* play here */
        printf("%s processed  %3d %3d\n", c_id, item.seq, item.payload);
    }
    printf("%s end\n", c_id);
    return NULL;
}

int main(int argc, char *argv[]) {
    int num_cons = 1;
    pthread_t t_prod;
    pthread_t *t_cons;
    int i;
    int *nums;
    if (argc > 1) {
        num_cons = atoi(argv[1]);
        if (num_cons == 0) {
            num_cons = 1;
        }
        if (num_cons > 99) {
            num_cons = 99;
        }
    }
    if (argc > 2) {
        num_data = atoi(argv[2]);
        if (num_data < 10) {
            num_data = 10;
        }
        if (num_data > 600) {
            num_data = 600;
        }
    }

    printf("Spawning %d consumer%s for %d items.\n", num_cons, num_cons == 1 ? "" : "s", num_data);
    t_cons = malloc(sizeof(pthread_t) * num_cons);
    nums = malloc(sizeof(int) * num_cons);
    if (!t_cons || !nums) {
        printf("Out of memory!\n");
        exit(1);
    }
    srand(time(NULL));
    pthread_create(&t_prod, NULL, producer, NULL);

    for (i = 0; i < num_cons; ++i) {
        nums[i] = i + 1;
        usleep(100000); /* play here */
        pthread_create(t_cons + i, NULL, consumer, nums + i);
    }

    pthread_join(t_prod, NULL);

    for (i = 0; i < num_cons; ++i) {
        pthread_join(t_cons[i], NULL);
    }
    free(nums);
    free(t_cons);

    return 0;
}

我希望我已经消除了您的疑虑,并为您提供了一些代码来进行实验并获得对 pthread 行为的一些信心。

关于c - pthread_cond_wait 和 pthread_mutex_lock 的优先级?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44415264/

相关文章:

linux - 服务文件中的 After 和 Requires 标签是什么意思?

multithreading - 在什么意义上,每个线程对于操作系统来说都是一个单独的 CPU?

java - 为什么单线程操作比无线程操作花费的时间长3倍?

c - 下面这段代码有错误

C Malloc 多维字符数组

linux - 如何使用 use as in linux 和汇编语言

c - 如何确保 pthread_cond_wait() 不会错过任何 pthread_cond_signal()?

c - 子目录中的头文件(例如 gtk/gtk.h 与 gtk-2.0/gtk/gtk.h)

c - 运行在普通内核上、使用普通调度程序的普通进程进入一个事件周期的平均时间是多少?

安卓文件系统