我有一堆线程在处理多个数据项。线程必须按照我将数据交给线程的相同顺序输出结果。即:
Thread #1: give data - start processing
Thread #2: give data - start processing
Thread #3: give data - start processing
...
Thread #n: give data - start processing
无论哪个线程先完成处理,结果的检索顺序都应与数据传递给线程的顺序相同。即:
Thread #1: put data
Thread #2: put data
...
为了区分线程并管理它们,我给每个线程一个 ID (0,1,2,...,n)
.我正在使用 ID 将数据分配给每个线程,以便它可以处理它。
for(int i=0; i<thread_count; i++)
give_data(i); // i is id and the function knows where to get data from
我希望线程共享一个 token ,该 token 确定预期哪个线程会产生结果。所有线程主体都是相同的,主体看起来像:
while(true){
auto data = get_data();
result = process_data(data);
while(token != this_id) spin;
put_data(result); // this is a synchronized call
update_token(token);
}
我的问题来自 token
.我首先尝试了一个普通的引用 ( int & token
),它显然不能工作(我没想到它能)。不管怎样,我使用了一个静态变量,线程并不总是得到最新的。我很惊讶地看到一个线程主宰一切。每当一个线程更新 token 时,它就会失去轮到允许另一个线程放置它的结果等等。但是,我有一个线程占主导地位,就好像 token 总是设置为它自己的 ID 而不是更新。
如果非要我猜的话,我会说这是一个缓存问题。但是,我不确定。
无论如何,我正在考虑使用 std::atomic<int>
作为我的信物。行得通吗?如果没有,我还应该考虑做什么?同步这些线程的更好方法是什么?
额外:这感觉像是一个糟糕的设计,我不确定如何做得更好。任何建议将不胜感激。
最佳答案
Anyway, I used a static variable and the threads do not always get the latest one. I was surprised to see one thread dominating everything
是的,多个线程访问同一个非同步值并且至少有一个线程写入该值是数据竞争,根据 C++ 标准,这是未定义的行为。任何事情都可能发生。
I am thinking of using std::atomic as my token. Would it work?
是的。这将防止 token 上的任何数据竞争。我在您的伪代码中没有看到任何其他直接问题,因此从这个角度来看它看起来不错。
this feels like a bad design and I am not sure how to do it better. Any suggestions would be very much appreciated.
整个设计看起来确实有些奇怪,但如果有更简单的方式来表达它,这取决于您的线程库。例如,对于 OpenMP,您可以一次性执行此操作(give_data
和 get_data
背后的逻辑太不清楚,无法完整说明):
#pragma omp parallel
{
int threadCount = omp_get_num_threads();
#pragma omp single
for (int i = 0; i < threadCount; ++i)
give_data(i);
#pragma omp ordered for ordered schedule(static)
for (int i = 0; i < threadCount; ++i)
{
auto data = get_data();
result = process_data(data);
#pragma omp ordered
put_data(result); // this is a synchronized call
}
}
ordered
指令强制 put_data
调用以完全相同的顺序(一个接一个)执行,就像循环是串行的一样,而线程可以仍然并行进行先前的数据处理。
如果您真正想要做的只是让一个大循环的数据处理与有序写入并行,那么使用 OpenMP 事情实际上可能会更容易:
#pragma omp parallel for ordered schedule(static)
for (int i = 0; i < dataItemCount; ++i)
{
auto data = get_data(i); // whatever this would entail
auto result = process_data(data);
#pragma omp ordered
put_data(result); // this is a synchronized call
}
看起来您并不需要按顺序分布数据项,但如果您真的这样做了,那么这种方法就不会那么容易工作,因为每个有序循环只能有一个有序部分。
关于c++ - std::atomic 与用于线程同步的静态变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50006065/