c - 使用 pthreads 添加两个 vector 而不使用全局 sum 变量

标签 c multithreading pthreads

我正在尝试使用 C 中的 pthread 来计算两个 vector a 和 b 的总和。我得到了一个以顺序形式计算总和的函数,另一个以并行形式计算总和。我的程序工作正常,但当有多个线程时计算不同的总和。我已经在关键区域使用了正确的线程同步,但仍然看不出哪里出了问题。我在第一个线程上得到了正确的答案,因为只有一个线程在执行这项工作,然后我在多个线程上得到了错误的答案。这是我的代码:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


// type for value of vector element
typedef short value_t;
// type for vector dimension / indices
typedef long index_t;
// function type to combine two values
typedef value_t (*function_t)(const value_t x, const value_t y);
// struct to store the respective values of the vectors a,b and c
typedef struct{
    index_t start;
    index_t end;
    value_t *arr;
    value_t *brr;
    value_t *crr;
    value_t *part_sum;
    pthread_mutex_t *mutex;
}arg_struct;

// function to combine two values
value_t add(const value_t x, const value_t y) {
  return ((x+y)*(x-y)) % ((int)x+1) +27;
}


// function to initialize the vectors a,b and c
void vectorInit(index_t n, value_t a[n], value_t b[n], value_t c[n]) {

  for(index_t i=0; i<n; i++) {
    a[i] = (value_t)(2*i);
    b[i] = (value_t)(n-i);
    c[i] = 0;
  }
}


// function to count the sum of two variables sequentially
value_t vectorOperation(index_t n, value_t a[n], value_t b[n], value_t c[n], function_t f) {

  value_t sum = 0;

  for(index_t i=0; i<n; i++) {
    sum += (c[i] = f(a[i], b[i]));
  }

  return sum;
}
/* Thread function */
void* vector_sum(void* arg)
{   
    arg_struct *param = (arg_struct*)arg;
   /*
    for(index_t i= param->start; i<param->end; i++)
    {
        pthread_mutex_lock(&param->mutex);
        *param->part_sum += vectorOperation(i,param->arr,param->brr,param->crr,add);
        pthread_mutex_unlock(&param->mutex);
    }
    */
    index_t n = param->end - param->start;
    pthread_mutex_lock(&(*param->mutex));
    // Each thread uses the vectorOperation function to calculate the sum sequentially(Also the critical area)
    *param->part_sum = *param->part_sum + vectorOperation(n,param->arr,param->brr,param->crr,add);
    //*param->part_sum += vectorOperation(param->end-param->start,param->arr,param->brr,param->crr,add);
    pthread_mutex_unlock(&(*param->mutex));

    pthread_exit(NULL);
}



// Sum of two vectors in parallel. 
value_t vectorOperationParallel(index_t n, value_t a[n], value_t b[n], value_t c[n], function_t f, int p) {

  value_t sum = 0;

    pthread_t threads[p];
    arg_struct thread_args[p];
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex,NULL);
    index_t div = (n+p-1)/p;

      for(int i=0; i<p; i++)
    {
        thread_args[i].start = i*div;
        thread_args[i].end = (i+1)*div;
        thread_args[i].arr = a;
        thread_args[i].brr = b;
        thread_args[i].crr = c;
          for(int j =0; j<div; j++)
          {
          thread_args[i].arr[j] = a[thread_args[i].start+j];
          thread_args[i].brr[j] = b[thread_args[i].start+j];
          thread_args[i].crr[j] = c[thread_args[i].start+j];

          }
        thread_args[i].part_sum = &sum;
        thread_args[i].mutex = &mutex;
        pthread_create(&threads[i],NULL,vector_sum, (void*)&thread_args[i]);
    }


    for(int i=0; i<p; i++)
    {
        pthread_join(threads[i],NULL);
    }
  return sum;
}


int main(int argc, char **argv)
{
  // check for correct argument count
  if (argc != 3)
    {
      printf ("usage: %s vector_size n_threads\n", argv[0]);
      exit (EXIT_FAILURE);
    }

  // get arguments
  // vector size
  index_t n = (index_t)atol (argv[1]);
  // number of threads
  int p = atoi (argv[2]);
  // check for plausible values
  if((p < 1) || (p > 1000)) {
      printf("illegal number of threads\n");
      exit (EXIT_FAILURE);
  }

  // allocate memory
  value_t *a = malloc(n * sizeof(*a));
  value_t *b = malloc(n * sizeof(*b));
  value_t *c = malloc(n * sizeof(*c));
  if((a == NULL) || (b == NULL) || (c == NULL)) {
    printf("no more memory\n");
    exit(EXIT_FAILURE);
  }

  // initialize vectors a,b,c
  vectorInit(n, a, b, c);

  // work on vectors sequentially
  value_t c1sum = vectorOperation(n, a, b, c, add);

  // work on vectors parallel for all thread counts from 1 to p
  for(int thr=1; thr<= p; thr++) {
    // do operation
    value_t c2sum = vectorOperationParallel(n, a, b, c, add, thr);

    // check result
    if(c1sum != c2sum) {
      printf("!!! error: vector results are not identical !!!\nsum1=%ld, sum2=%ld\n", (long)c1sum, (long)c2sum);
      return EXIT_FAILURE;
    } 
    else
        printf("The results are equal: sum1=%ld, sum2=%ld\n",(long)c1sum, (long)c2sum);
  }

  return EXIT_SUCCESS;

}

最佳答案

好吧,我不确定,但这似乎是问题所在。

起初,变量的名称很糟糕。

然后n.m。评论:

pthread_mutex_init in a loop is probably a bad idea

您计算index_t div = (elements_in_vector + num_of_threads - 1)/num_of_threads; 然后使用 div * num_of_threads 来分布元素。这样您就可以尝试访问比可用元素更多的元素。

示例:

index_t div = (elements_in_vector + num_of_threads - 1) / num_of_threads;
//(13 * 5 - 1) / 5 = 3
thread_args[i].end = (i + 1) * div; // for the last i ( = 2)
//(2 + 1) * 5 = 15

一旦你访问i >= 13,你就会得到垃圾值(未定义的行为)

然后,您复制原始数组的各个部分(我认为这比仅传递对原始数组的引用要慢)。

您似乎根本没有使用结果数组*thread_args[i].crr

您只需要所有值总和的互斥体,因为您在线程中传递的每个数组都有专用内存。如果您不在所有线程中使用 sum 变量,您甚至可以将原始数组的指针传递给没有互斥锁的线程。因为每个添加都是自包含的并且不会访问另一个添加的内存,所以不需要互斥体。

要计算所有值的总和,您可以仅使用线程的返回值,而不是对传递给每个线程的值的引用。这样速度会快很多。

我不确定我是否找到了所有内容,但这可能会帮助您改进这一点。

关于c - 使用 pthreads 添加两个 vector 而不使用全局 sum 变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50058928/

相关文章:

c - Linux线程优先级

c# - Interlocked.CompareExchange 真的比简单的锁更快吗?

java - 多线程 TestNG DataProvider 执行

c++ - pthread_create() 调用的函数的多个参数 - 参数是函数指针

c++ - 暂停和恢复 pthread 的最佳解决方案是什么?

c - 从 C 中的输入动态读取字符串

c - 声明临时结构作为参数传递——结构被重写。如何在新的内存位置声明该结构?

将 C 程序转换为内联汇编?

c++ - 在 GDB 中调试多线程服务器 - 查找每个线程的状态。执行时计数并停止

c - 汇编语言中的 getchar()