我正在尝试使用 C 中的 pthread 来计算两个 vector a 和 b 的总和。我得到了一个以顺序形式计算总和的函数,另一个以并行形式计算总和。我的程序工作正常,但当有多个线程时计算不同的总和。我已经在关键区域使用了正确的线程同步,但仍然看不出哪里出了问题。我在第一个线程上得到了正确的答案,因为只有一个线程在执行这项工作,然后我在多个线程上得到了错误的答案。这是我的代码:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
// type for value of vector element
typedef short value_t;
// type for vector dimension / indices
typedef long index_t;
// function type to combine two values
typedef value_t (*function_t)(const value_t x, const value_t y);
// struct to store the respective values of the vectors a,b and c
typedef struct{
index_t start;
index_t end;
value_t *arr;
value_t *brr;
value_t *crr;
value_t *part_sum;
pthread_mutex_t *mutex;
}arg_struct;
// function to combine two values
value_t add(const value_t x, const value_t y) {
return ((x+y)*(x-y)) % ((int)x+1) +27;
}
// function to initialize the vectors a,b and c
void vectorInit(index_t n, value_t a[n], value_t b[n], value_t c[n]) {
for(index_t i=0; i<n; i++) {
a[i] = (value_t)(2*i);
b[i] = (value_t)(n-i);
c[i] = 0;
}
}
// function to count the sum of two variables sequentially
value_t vectorOperation(index_t n, value_t a[n], value_t b[n], value_t c[n], function_t f) {
value_t sum = 0;
for(index_t i=0; i<n; i++) {
sum += (c[i] = f(a[i], b[i]));
}
return sum;
}
/* Thread function */
void* vector_sum(void* arg)
{
arg_struct *param = (arg_struct*)arg;
/*
for(index_t i= param->start; i<param->end; i++)
{
pthread_mutex_lock(¶m->mutex);
*param->part_sum += vectorOperation(i,param->arr,param->brr,param->crr,add);
pthread_mutex_unlock(¶m->mutex);
}
*/
index_t n = param->end - param->start;
pthread_mutex_lock(&(*param->mutex));
// Each thread uses the vectorOperation function to calculate the sum sequentially(Also the critical area)
*param->part_sum = *param->part_sum + vectorOperation(n,param->arr,param->brr,param->crr,add);
//*param->part_sum += vectorOperation(param->end-param->start,param->arr,param->brr,param->crr,add);
pthread_mutex_unlock(&(*param->mutex));
pthread_exit(NULL);
}
// Sum of two vectors in parallel.
value_t vectorOperationParallel(index_t n, value_t a[n], value_t b[n], value_t c[n], function_t f, int p) {
value_t sum = 0;
pthread_t threads[p];
arg_struct thread_args[p];
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,NULL);
index_t div = (n+p-1)/p;
for(int i=0; i<p; i++)
{
thread_args[i].start = i*div;
thread_args[i].end = (i+1)*div;
thread_args[i].arr = a;
thread_args[i].brr = b;
thread_args[i].crr = c;
for(int j =0; j<div; j++)
{
thread_args[i].arr[j] = a[thread_args[i].start+j];
thread_args[i].brr[j] = b[thread_args[i].start+j];
thread_args[i].crr[j] = c[thread_args[i].start+j];
}
thread_args[i].part_sum = ∑
thread_args[i].mutex = &mutex;
pthread_create(&threads[i],NULL,vector_sum, (void*)&thread_args[i]);
}
for(int i=0; i<p; i++)
{
pthread_join(threads[i],NULL);
}
return sum;
}
int main(int argc, char **argv)
{
// check for correct argument count
if (argc != 3)
{
printf ("usage: %s vector_size n_threads\n", argv[0]);
exit (EXIT_FAILURE);
}
// get arguments
// vector size
index_t n = (index_t)atol (argv[1]);
// number of threads
int p = atoi (argv[2]);
// check for plausible values
if((p < 1) || (p > 1000)) {
printf("illegal number of threads\n");
exit (EXIT_FAILURE);
}
// allocate memory
value_t *a = malloc(n * sizeof(*a));
value_t *b = malloc(n * sizeof(*b));
value_t *c = malloc(n * sizeof(*c));
if((a == NULL) || (b == NULL) || (c == NULL)) {
printf("no more memory\n");
exit(EXIT_FAILURE);
}
// initialize vectors a,b,c
vectorInit(n, a, b, c);
// work on vectors sequentially
value_t c1sum = vectorOperation(n, a, b, c, add);
// work on vectors parallel for all thread counts from 1 to p
for(int thr=1; thr<= p; thr++) {
// do operation
value_t c2sum = vectorOperationParallel(n, a, b, c, add, thr);
// check result
if(c1sum != c2sum) {
printf("!!! error: vector results are not identical !!!\nsum1=%ld, sum2=%ld\n", (long)c1sum, (long)c2sum);
return EXIT_FAILURE;
}
else
printf("The results are equal: sum1=%ld, sum2=%ld\n",(long)c1sum, (long)c2sum);
}
return EXIT_SUCCESS;
}
最佳答案
好吧,我不确定,但这似乎是问题所在。
起初,变量的名称很糟糕。
然后n.m。评论:
pthread_mutex_init in a loop is probably a bad idea
您计算index_t div = (elements_in_vector + num_of_threads - 1)/num_of_threads;
然后使用 div * num_of_threads
来分布元素。这样您就可以尝试访问比可用元素更多的元素。
示例:
index_t div = (elements_in_vector + num_of_threads - 1) / num_of_threads;
//(13 * 5 - 1) / 5 = 3
thread_args[i].end = (i + 1) * div; // for the last i ( = 2)
//(2 + 1) * 5 = 15
一旦你访问i >= 13
,你就会得到垃圾值(未定义的行为)
然后,您复制原始数组的各个部分(我认为这比仅传递对原始数组的引用要慢)。
您似乎根本没有使用结果数组*thread_args[i].crr
。
您只需要所有值总和的互斥体,因为您在线程中传递的每个数组都有专用内存。如果您不在所有线程中使用 sum 变量,您甚至可以将原始数组的指针传递给没有互斥锁的线程。因为每个添加都是自包含的并且不会访问另一个添加的内存,所以不需要互斥体。
要计算所有值的总和,您可以仅使用线程的返回值,而不是对传递给每个线程的值的引用。这样速度会快很多。
我不确定我是否找到了所有内容,但这可能会帮助您改进这一点。
关于c - 使用 pthreads 添加两个 vector 而不使用全局 sum 变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50058928/