c - 使用 pthreads 的简单流水线

标签 c multithreading pthreads mutex

最近我开始研究 pthreads 并尝试使用 pthreads 实现软件流水线。为此,我自己编写了一个玩具程序,其中一个类似程序将成为我主要项目的一部分。

所以在这个程序中,主线程创建并输入输出整数类型的缓冲区,然后创建一个主线程 并将这些缓冲区传递给主线程主线程 依次创建两个工作线程

ma​​in 传递到ma​​ster 线程输入输出缓冲区 的大小nxk(例如 5x10 的 int 大小)。 ma​​ster thread 迭代大小为 k(即 10)的 block n(即 5)次。 ma​​ster thread 中有一个循环运行了 k(此处为 5)次。在 k 的每次迭代中,ma​​ster thread 对大小为 n 的输入数据的一部分进行一些操作,并将其放入 common缓冲区ma​​sterworker 线程 之间共享。 主线程 然后通知工作线程数据已放入公共(public)缓冲区

如果公共(public)缓冲区 准备就绪,两个工作线程 等待来自主线程 的信号。 公共(public)缓冲区 上的操作在工作线程 中分成一半。这意味着一个工作线程 将在前半部分工作,而另一个工作线程 将在公共(public)缓冲区 的后半部分工作。 一旦 worker threadsma​​ster thread 获得信号,每个 worker thread 对他们的一半数据进行一些操作并复制它到输出缓冲区。然后,工作线程 通过设置标志值通知主线程 它们在公共(public)缓冲区 上的操作已完成。为工作线程 创建一组标志。 ma​​ster thread 继续检查是否设置了所有标志,这基本上意味着所有 worker threads 都完成了它们在 common buffer 上的操作,所以 < strong>主线程 可以将下一个数据 block 安全地放入公共(public)缓冲区,供工作线程使用。

所以本质上,ma​​sterworker threads 之间存在着一种流水线方式的通信。最后,我在主线程 中打印输出缓冲区。但是我根本没有输出。我已经复制粘贴了我的代码,几乎所有步骤都有完整的注释。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/time.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdbool.h>
#include <string.h>

#define MthNum 1 //Number of Master threads
#define WthNum 2 //Number of Worker threads
#define times 5 // Number of times the iteration (n in the explanation)
#define elNum 10 //Chunk size during each iteration (k in the explanation)

pthread_mutex_t mutex; // mutex variable declaration
pthread_cond_t cond_var; //conditional variarble declaration
bool completion_flag = true; //This global flag indicates the completion of the worker thread. Turned false once all operation ends
                             //marking the completion
int *commonBuff; //common buffer between master and worker threads
int *commFlags; //array of flags that are turned to 1 by each worker threads. So worker thread i turns commFlags[i] to 1
                // the master thread turns commFlags[i] = 0 for i =0 to (WthNum - 1)
int *commFlags_s;
int counter; // This counter used my master thread to count if all the commFlags[i] that shows
             //all the threads finished their work on the common buffer
// static pthread_barrier_t barrier;
// Arguments structure passed to master thread
typedef struct{
    int *input; // input buffer
    int *output;// output buffer
}master_args;

// Arguments structure passed to worker thread
typedef struct{
    int threadId;
    int *outBuff;
}worker_args;

void* worker_func(void *arguments);
void *master_func(void *);

int main(int argc,char*argv[]){

    int *ipData,*opData;
    int i,j;

    // allocation of input buffer and initializing to 0
    ipData = (int *)malloc(times*elNum*sizeof(int));
    memset(ipData,0,times*elNum*sizeof(int));

    // allocation of output buffer and initializing to 0
    opData = (int *)malloc(times*elNum*sizeof(int));
    memset(opData,0,times*elNum*sizeof(int));

    pthread_t thread[MthNum];
    master_args* args[MthNum];


    //creating the single master thread and passing the arguments
    for( i=0;i<MthNum;i++){
        args[i] = (master_args *)malloc(sizeof(master_args));
        args[i]->input= ipData;
        args[i]->output= opData;
        pthread_create(&thread[i],NULL,master_func,(void *)args[i]);
    }

    //joining the master thred
    for(i=0;i<MthNum;i++){
        pthread_join(thread[i],NULL);
    }

    //printing the output buffer values
    for(j =0;j<times;j++ ){
        for(i =0;i<elNum;i++){
            printf("%d\t",opData[i+j*times]);
        }
      printf("\n");
    }

    return 0;
}

//This is the master thread function
void *master_func(void *arguments){

    //copying the arguments pointer to local variables
    master_args* localMasterArgs = (master_args *)arguments;
    int *indataArgs = localMasterArgs->input; //input buffer
    int *outdataArgs = localMasterArgs->output; //output buffer

    //worker thread declaration
    pthread_t Workers[WthNum];
    //worker thread arguments declaration
    worker_args* wArguments[WthNum];
    int i,j;

    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init (&cond_var, NULL);
    counter =0;

    commonBuff = (int *)malloc(elNum*sizeof(int));

    commFlags = (int *)malloc(WthNum*sizeof(int));
    memset(commFlags,0,WthNum*sizeof(int) );
    commFlags_s= (int *)malloc(WthNum*sizeof(int));
    memset(commFlags_s,0,WthNum*sizeof(int) );

    for(i =0;i<WthNum;i++){

        wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
        wArguments[i]->threadId = i;
        wArguments[i]->outBuff = outdataArgs;

        pthread_create(&Workers[i],NULL,worker_func,(void *)wArguments[i]);
    }

    for (i = 0; i < times; i++) {
        for (j = 0; j < elNum; j++)
            indataArgs[i + j * elNum] = i + j;

        while (counter != 0) {
            counter = 0;

            pthread_mutex_lock(&mutex);
            for (j = 0; j < WthNum; j++) {
                counter += commFlags_s[j];
            }
            pthread_mutex_unlock(&mutex);

        }
        pthread_mutex_lock(&mutex);
        memcpy(commonBuff, &indataArgs[i * elNum], sizeof(int));
        pthread_mutex_unlock(&mutex);
        counter = 1;
        while (counter != 0) {
            counter = 0;

            pthread_mutex_lock(&mutex);
            for (j = 0; j < WthNum; j++) {
                counter += commFlags[j];
            }
            pthread_mutex_unlock(&mutex);


        }
        // printf("master broad cast\n");
        pthread_mutex_lock(&mutex);
        pthread_cond_broadcast(&cond_var);
         //releasing the lock
        pthread_mutex_unlock(&mutex);

    }

    pthread_mutex_lock(&mutex);
     completion_flag = false;
    pthread_mutex_unlock(&mutex);

    for (i = 0; i < WthNum; i++) {
        pthread_join(Workers[i], NULL);
    }

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond_var);

    return NULL;
}


void* worker_func(void *arguments){

    worker_args* localArgs = (worker_args*)arguments;

    //copying the thread ID and the output buffer
    int tid = localArgs->threadId;
    int *localopBuffer = localArgs->outBuff;
    int i,j;
    bool local_completion_flag=false;

    while(local_completion_flag){

        pthread_mutex_lock(&mutex);
        commFlags[tid] =0;
        commFlags_s[tid] =1;
        pthread_cond_wait(&cond_var,&mutex);
        commFlags_s[tid] =0;
        commFlags[tid] =1;
        if (tid == 0) {
            for (i = 0; i < (elNum / 2); i++) {
                localopBuffer[i] = commonBuff[i] * 5;
            }
        } else { // Thread ID 1 operating on the other half of the common buffer data and placing on the
                 // output buffer
            for (i = 0; i < (elNum / 2); i++) {
                localopBuffer[elNum / 2 + i] = commonBuff[elNum / 2 + i] * 10;
            }
        }
         local_completion_flag=completion_flag;
        pthread_mutex_unlock(&mutex);//releasing the lock

    }

    return NULL;
}

但我不知道我在实现中哪里做错了,因为从逻辑上讲它似乎是正确的。但是我的实现肯定有问题。我花了很长时间尝试不同的方法来修复它,但没有任何效果。很抱歉这篇很长的帖子,但我无法确定我可能做错的部分,所以我无法简明扼要。因此,如果有人可以查看问题和实现并建议需要进行哪些更改才能按预期运行它,那将非常有帮助。感谢您的帮助和帮助。

最佳答案

这段代码中有几个错误。

  1. 您可以从修复工作线程的创建开始:

    wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
    wArguments[i]->threadId = i;
    wArguments[i]->outBuff = outdataArgs;
    
    pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments);
    

您正在初始化 worker_args 结构但不正确 - 将指针传递给数组 (void *)wArguments 而不是指向您刚刚初始化的数组元素的指针。

pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments[i]);
//                                                             ^^^
  1. 在启动使用它的值的线程之前初始化计数器:

    void *master_func(void *arguments)
    {
    /* (...) */
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init (&cond_var, NULL);
    counter = WthNum;
    
  2. 启动主线程时,错误地将指针传递给指针:

    pthread_create(&thread[i],NULL,master_func,(void *)&args[i]);
    

请将此更改为:

pthread_create(&thread[i],NULL,master_func,(void *) args[i]);
  1. 所有对 counter 变量的访问(与任何其他共享内存一样)必须在线程之间同步。

关于c - 使用 pthreads 的简单流水线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35456565/

相关文章:

c - 自动为数组分配内存

c++ - FindBLAS 和 FindLAPACK 在 CMake 中的使用

c - 在动态数组实现中使用 realloc()

java - Thread setDaemon(true) 挂起 Tomcat Shutdown

java - 在 Callable 中处理 Thread.interrupted() 的正确方法?

c++ - 如何使用 pthreads 以正确的方式设置两个线程之一的优先级

c - 使用 fork() 作为线程应用程序的后台

c - 如果线程中发生fork调用,fork进程从哪里开始?

c - 使用指针对字符串进行排序

for 循环中的 C++ 11 线程类成员函数给出段错误