c - 使用全局变量同步 p_threads

标签 c multithreading concurrency pthreads mutex

我是 C 的新手,所以我不确定从哪里开始挖掘我的问题。 我正在尝试将 python 数字运算算法移植到 C,并且由于 C 中没有 GIL(哇哦),我可以从线程更改内存中的任何内容,只要我确保没有竞争。

我做了关于互斥锁的功课,但是,我无法全神贯注地使用互斥锁,以防不断运行的线程一遍又一遍地访问同一个数组。

我正在使用 p_threads 来将工作负载分配到一个大数组 a[N] 上。 数组 a[N] 上的数字运算算法是累加的,所以我使用 a_diff[N_THREADS][N] 数组拆分它,写入更改以应用于 a[N] 数组从每个线程到 a_diff[N_THREADS][N],然后在每个步骤后将它们合并在一起。

我需要在不同版本的数组 a[N] 上运行运算,所以我通过全局指针 p 传递它们(在 MWE 中,只有一个 a[N])

我正在使用另一个全局数组SYNC_THREADS[N_THREADS] 同步线程,并通过设置END_THREADS 全局确保线程在我需要时退出(我知道,我'我使用了太多全局变量——我不在乎,代码大约有 200 行)。我的问题是关于这种同步技术 - 这样做是否安全以及实现该同步技术的更清洁/更好/更快的方法是什么?

MWE:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N_THREADS 3
#define N 10000000
#define STEPS 3

double a[N];  // main array
double a_diff[N_THREADS][N];  // diffs array
double params[N];  // parameter used for number-crunching
double (*p)[N];  // pointer to array[N]

// structure for bounds for crunching the array
struct bounds {
    int lo;
    int hi;
    int thread_num;
};
struct bounds B[N_THREADS];
int SYNC_THREADS[N_THREADS];  // for syncing threads
int END_THREADS = 0;  // signal to terminate threads


static void *crunching(void *arg) {
    // multiple threads run number-crunching operations according to assigned low/high bounds
    struct bounds *data = (struct bounds *)arg;
    int lo = (*data).lo;
    int hi = (*data).hi;
    int thread_num = (*data).thread_num;
    printf("worker %d started for bounds [%d %d] \n", thread_num, lo, hi);

    int i;

    while (END_THREADS != 1) {  // END_THREADS tells threads to terminate
        if (SYNC_THREADS[thread_num] == 1) {  // SYNC_THREADS allows threads to start number-crunching
            printf("worker %d working... \n", thread_num );
            for (i = lo; i <= hi; ++i) {
                a_diff[thread_num][i] += (*p)[i] * params[i];  // pretend this is an expensive operation...
            }
            SYNC_THREADS[thread_num] = 0;  // thread disables itself until SYNC_THREADS is back to 1
            printf("worker %d stopped... \n", thread_num );
        }
    }
    return 0;
}


int i, j, th,s;
double joiner;

int main() {
    // pre-fill arrays
    for (i = 0; i < N; ++i) {
        a[i] = i + 0.5;
        params[i] = 0.0;
    }

    // split workload between workers
    int worker_length = N / N_THREADS;
    for (i = 0; i < N_THREADS; ++i) {
        B[i].thread_num = i;
        B[i].lo = i * worker_length;
        if (i == N_THREADS - 1) {
            B[i].hi = N;
        } else {
            B[i].hi = i * worker_length + worker_length - 1;
        }
    }
    // pointer to parameters to be passed to worker
    struct bounds **data = malloc(N_THREADS * sizeof(struct bounds*));
    for (i = 0; i < N_THREADS; i++) {
        data[i] = malloc(sizeof(struct bounds));
        data[i]->lo = B[i].lo;
        data[i]->hi = B[i].hi;
        data[i]->thread_num = B[i].thread_num;
    }
    // create thread objects
    pthread_t threads[N_THREADS];

    // disallow threads to crunch numbers
    for (th = 0; th < N_THREADS; ++th) {
        SYNC_THREADS[th] = 0;
    }

    // launch workers
    for(th = 0; th < N_THREADS; th++) {
        pthread_create(&threads[th], NULL, crunching, data[th]);
    }

    // big loop of iterations
    for (s = 0; s < STEPS; ++s) {
        for (i = 0; i < N; ++i) {
            params[i] += 1.0;  // adjust parameters

            // zero diff array
            for (i = 0; i < N; ++i) {
                for (th = 0; th < N_THREADS; ++th) {
                    a_diff[th][i] = 0.0;
                }
            }
            p = &a;  // pointer to array a
            // allow threads to process numbers and wait for threads to complete
            for (th = 0; th < N_THREADS; ++th) { SYNC_THREADS[th] = 1; }
            // ...here threads started by pthread_create do calculations...
            for (th = 0; th < N_THREADS; th++) { while (SYNC_THREADS[th] != 0) {} }

            // join results from threads (number-crunching is additive)
            for (i = 0; i < N; ++i) {
                joiner = 0.0;
                for (th = 0; th < N_THREADS; ++th) {
                    joiner += a_diff[th][i];
                }
                a[i] += joiner;
            }
        }
    }


    // join workers
    END_THREADS = 1;
    for(th = 0; th < N_THREADS; th++) {
        pthread_join(threads[th], NULL);
    }

    return 0;
}

我看到 worker 在时间上没有重叠:

worker 0 started for bounds [0 3333332]
worker 1 started for bounds [3333333 6666665]
worker 2 started for bounds [6666666 10000000]
worker 0 working...
worker 1 working...
worker 2 working...
worker 2 stopped...
worker 0 stopped...
worker 1 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 0 stopped...
worker 2 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 2 stopped...
worker 0 stopped...

Process returned 0 (0x0)   execution time : 1.505 s

并且我通过 a_diff[thead_num][N] 子数组将它们分开,以确保工作人员不会进入彼此的工作空间,但是,我不确定情况总是如此,并且我不会在某处引入隐藏种族......

最佳答案

我没意识到问题是什么:-)

所以,问题是您是否考虑好您的SYNC_THREADSEND_THREADS 同步机制。
是的!...差不多。问题是线程在等待时正在消耗 CPU。

条件变量

要使线程等待事件,您需要使用条件变量 (pthread_cond)。它们提供了一些有用的函数,例如 wait()signal()broadcast():

  • wait(&cond, &m) 在给定条件变量中阻塞线程。 [注2]
  • signal(&cond) 解锁在给定条件变量中等待的线程。
  • broadcast(&cond) 解锁在给定条件变量中等待的所有线程。

最初你会让所有的线程都在等待[note 1]:

while(!start_threads)
  pthread_cond_wait(&cond_start);

并且,当主线程就绪时:

start_threads = 1;
pthread_cond_broadcast(&cond_start);

障碍

如果您在迭代之间存在数据依赖性,您需要确保线程在任何给定时刻都在执行相同的迭代。

要在每次迭代结束时同步线程,您需要查看障碍 (pthread_barrier):

  • pthread_barrier_init(count):初始化屏障以同步 count 个线程。
  • pthread_barrier_wait():线程在这里等待,直到所有 count 个线程到达屏障。

扩展障碍的功能

有时你会希望最后一个线程到达障碍来计算一些东西(例如增加迭代次数的计数器,或者计算一些全局值,或者检查执行是否应该停止)。你有两个选择

使用pthread_barrier

你基本上需要有两个障碍:

int rc = pthread_barrier_wait(&b);
if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
  if(shouldStop()) stop = 1;
pthread_barrier_wait(&b);
if(stop) return;

使用 pthread_cond 实现我们自己的专用屏障

pthread_mutex_lock(&mutex)
remainingThreads--;
// all threads execute this
executedByAllThreads();
if(remainingThreads == 0) {
  // reinitialize barrier
  remainingThreads = N;
  // only last thread executes this
  if(shouldStop()) stop = 1;
  pthread_cond_broadcast(&cond);
} else {
while(remainingThreads > 0)
  pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);

注意 1: 为什么 pthread_cond_wait() 位于 while block 中?可能看起来有点奇怪。其背后的原因是由于虚假唤醒的存在。即使未发出 signal()broadcast(),该函数也可能返回。因此,为了保证正确性,通常有一个额外的变量来保证如果一个线程在它应该唤醒之前突然唤醒,它会返回到 pthread_cond_wait()

来自手册:

When using condition variables there is always a Boolean predicate involving shared variables associated with each condition wait that is true if the thread should proceed. Spurious wakeups from the pthread_cond_timedwait() or pthread_cond_wait() functions may occur. Since the return from pthread_cond_timedwait() or pthread_cond_wait() does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.

(...)

If a signal is delivered to a thread waiting for a condition variable, upon return from the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or it shall return zero due to spurious wakeup.

注2:

Michael Burr 在评论中指出,每当您修改谓词 (start_threads) 和 pthread_cond_wait()pthread_cond_wait() 调用时会释放互斥;并在返回时重新获取它。

PS:这里有点晚了;对不起,如果我的文字令人困惑:-)

关于c - 使用全局变量同步 p_threads,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50262651/

相关文章:

c - 如果无效,如何拒绝用户输入并让他再次输入

javascript - 等待一组 worker 完成

ios - 如何从委托(delegate)在线程内构建我的单元格?

java - AsyncTask 和 Android 线程 : how to get it right ?

java - 具有 reentrantLock 条件的应用程序有时会挂起

iphone - NSXMLParser的parse方法是异步的吗

python - 如何将二维数组从 Python 传递到 C?

c - 将整个数组传递给函数

c - C中结构内部的函数指针

python - 在 tkinter 中同时运行两个脚本