c - 使用 3 个信号量的多个生产者和消费者

标签 c concurrency synchronization semaphore

对于给定大小为 24 的共享缓冲区,

enter image description here

下面是针对使用计数信号量的单个生产者和消费者的解决方案,

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

#define BUFFERSIZE 24

void *producer(void *);
void *consumer(void *);

sem_t mutex;          //Controls buffer access
sem_t fullBuffer;     //Prevents underflow
sem_t emptyBuffer;    // Prevents overflow

char buf[BUFFERSIZE] = {0}; //Shared resource

int main(int argc, char *argv[])
{
    pthread_t thread1, thread2;

    sem_init(&mutex, 0, 1);
    sem_init(&fullBuffer, 0, 0);
    sem_init(&emptyBuffer, 0, BUFFERSIZE);

    int ret1, ret2;

    ret1 = pthread_create(&thread1, NULL, producer, (void *)0);
    ret2 = pthread_create(&thread2, NULL, consumer, (void *)0);

    printf("Main function after pthread create\n");

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    sem_destroy(&mutex);
    sem_destroy(&fullBuffer);
    sem_destroy(&emptyBuffer);

    return 0;
}

void *producer(void *index)
{
    int * startIndex = index;
    int currentProduceIndex = *startIndex;
    while(1) {
        sem_wait(&emptyBuffer); // Prevents overflow, when value is 0
        sem_wait(&mutex);       // Controls buffer access
        buf[currentProduceIndex] = currentProduceIndex;
        sem_post(&mutex);
        sem_post(&fullBuffer);
        currentProduceIndex = (currentProduceIndex+1) % BUFFERSIZE;
    }
}

void *consumer(void *index)
{
    int *startIndex = index;
    int currentConsumeIndex = *startIndex;
    while(1) {
        sem_wait(&fullBuffer); // Prevents underflow, when value is 0
        sem_wait(&mutex);
        printf("Consumed %d: ", buf[currentConsumeIndex]);
        sem_post(&mutex);
        sem_post(&emptyBuffer);
        currentConsumeIndex = (currentConsumeIndex+1) % BUFFERSIZE;
    }
}

以上代码在 buf 上有单个生产者和消费者,使用 3 个信号量 mutexfullBufferemptyBuffer

进一步补充,3个信号量,mutexfullBuffer & emptyBuffer是否足以引入多个生产者和消费者来访问buf ,在上面的代码中?

最佳答案

正如 seibie 指出的那样,使用信号量很难实现。我修改了代码并使用了两个 mutex 和两个 cond 变量。您可以创建多个生产者和消费者线程,它应该可以正常工作。

#include <stdio.h>
#include <pthread.h>

#define BUFFERSIZE 24

void *producer(void *);
void *consumer(void *);

pthread_mutex_t consMutex;
pthread_mutex_t prodMutex;
pthread_cond_t  consCond;
pthread_cond_t  prodCond;

// Shared resource: Using it to store flags.
// If value at index is 0, it means the index is free
// and producer can write data. If it is 1, it means
// that index is filled, consumer can consume it.
// You need use separate array for actual data.
char buf[BUFFERSIZE] = {0};

int main(void)
{
   pthread_t thread1, thread2;

    pthread_mutex_init(&consMutex, NULL);
    pthread_mutex_init(&prodMutex, NULL);

    pthread_cond_init(&consCond, NULL);
    pthread_cond_init(&prodCond, NULL);

    int ret1, ret2;

    int prodIndex = 0;
    int consIndex = 0;

    ret1 = pthread_create(&thread1, NULL, producer, (void *)&prodIndex);
    ret2 = pthread_create(&thread2, NULL, consumer, (void *)&consIndex);

    printf("Main function after pthread create\n");

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    pthread_mutex_destroy(&prodMutex);
    pthread_mutex_destroy(&consMutex);

    pthread_cond_destroy(&prodCond);
    pthread_cond_destroy(&consCond);

    return 0;
}

void *producer(void *index)
{
    int *currentProduceIndex = index;
    while(1) {
        pthread_mutex_lock(&prodMutex);

        /* Check if current index is free to fill. */
        while(1 == buf[*currentProduceIndex])
        {
            /* If index is not free, wait for consumer to consume. */
            pthread_cond_wait(&prodCond, &prodMutex);
        }

        /* Now the current index is free. Fill it. */
        buf[*currentProduceIndex] = 1;

        /* Update the producer index. */
        *currentProduceIndex = (*currentProduceIndex+1)%BUFFERSIZE;
        pthread_mutex_unlock(&prodMutex);

        /* Notify consumer that an item has been produced. */
        pthread_mutex_lock(&consMutex);
        pthread_cond_signal(&consCond);
        pthread_mutex_unlock(&consMutex);
    }

    return NULL;
}

void *consumer(void *index)
{
    int *currentConsumeIndex = index;
    while(1) {

        pthread_mutex_lock(&consMutex);

        /* Check if current index is empty. */
        while(0 == buf[*currentConsumeIndex])
        {
            /* If index is empty, wait for producer to produce. */
            pthread_cond_wait(&consCond, &consMutex);
        }

        /* Index is filled, consume it. */
        buf[*currentConsumeIndex] = 0;

        /* Update the consumer index. */
        *currentConsumeIndex = (*currentConsumeIndex+1)%BUFFERSIZE;
        pthread_mutex_unlock(&consMutex);

        /* Notify producer that an item has been consumed. */
        pthread_mutex_lock(&prodMutex);
        pthread_cond_signal(&prodCond);
        pthread_mutex_unlock(&prodMutex);
    }

    return NULL;
}

关于c - 使用 3 个信号量的多个生产者和消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40187855/

相关文章:

c# - 优先自动复位事件

c - 为什么显示超时?

java - 让处理器旋转固定的 CPU 时间

Java future<object> 池化

mysql - 为什么我可以读取 MySql 中的脏行

sql-server - 微软 SQL 服务器 2005 : Initializing a merge subscription with alternate snapshot location

java - 如何轮流监听两组线程获取synchronized section?

c - 调整大小时的重定位行为

c - 如何在C中读取lua表

c++ - 为什么在重定向 stdout 和 stdin 时 Python 的行为不符合预期?