语境
你好!
我试图尝试使用 pthreads,并决定实现插入排序以查看性能差异,但我遇到了一个奇怪的错误,我不确定该去哪里或我真正遇到了什么问题。
脚步
对于上下文,我为并行插入排序所做的如下,
尺寸,
numberOfCores
(是的,输出是正确的,我有 4 个内核)numberOfCores
pthread_t 对象 numberOfCores - 1
, 和 cols == floor(sizeOfArray / numberOfCores)
,我用不同的测试输入一遍又一遍地检查,这些都是正确分配的,lastArray
数组,大小,lastArraySize = (sizeOfCompleteArray / numberOfCores) + (sizeOfCompleteArray % numberOfCores)
subArrays
,二维矩阵 lastArray
, insertionSort
函数,发送各自的 packed
数据。我的计划是merge
按每个单独的线程排序后的不同数组。我知道编写更高效的代码库是 100% 可能的,但这只是小事,我不想在这上面花太多钱。请参阅下面的代码逐行检查。 pthread_join
获取 numberOfCores
排序后的数组 问题
这是出现的问题,
代码
// def for InsertionSortPacking, mentioned below
struct InsertionSortPacking
{
ull *Array; // Pointer to the array
size_t ArraySize; // Size of the array
};
static inline void *
pthread_insertionSort(void *arrayMetaDataToUnpack) // arrayMetaDataToUnpack contains the pointer used to pass the data to the func
{
// UNPACKING START
ull *_Array = ((InsertionSortPacking *)arrayMetaDataToUnpack)->Array;
size_t sizeOfArray = ((InsertionSortPacking *)arrayMetaDataToUnpack)->ArraySize;
// UNPACKING END
// Compulsory use of reinterpret_cast to keep everything consistent
ull *__tempArray = reinterpret_cast<ull *>(_Array);
// Custom func to get the number of cores on machine
int numberOfCores = getNumCores();
std::cout << "Number of cores detected: " << numberOfCores << ".\n";
// Step 1, create vars
pthread_t threads[numberOfCores]; // all threads must run, step 4
int rows = numberOfCores - 1; // ... but not every thread might be bound to have equal job ( equal sub array sizes ), step 5
int cols = floor(sizeOfArray / numberOfCores); // the most evenly contained size possible, step 6
ull subArrays[rows][cols]{0u}; // the {} initializes everything for me, step 6
// step 7
int lastArraySize = (sizeOfArray / numberOfCores) + (sizeOfArray % numberOfCores);
ull lastArray[lastArraySize];
// step 8
for (int i = 0; i < rows; ++i)
for (int j = 0; j < cols; ++j)
subArrays[i][j] = __tempArray[i * numberOfCores + j];
// step 9
for (int i = 0, j = cols * rows + 1;
i < lastArraySize and j < sizeOfArray;
++i, ++j)
lastArray[i] = __tempArray[j];
// EXTRA, just for clarification. Individually, all work fine
// getTimeForTemplate just prints out some logs, takes in the
// the function pointer to the function I want to use, the array, I want to sort, the size, and a text to print
// for (int i = 0; i < rows; ++i)
// getTimeForTemplate(insertionSort, subArrays[i], cols, "insertion sort - serial, subArray[" + std::to_string(i) + std::string("]"));
// getTimeForTemplate(insertionSort, lastArray, lastArraySize, "insertion sort - serial, lastArray");
// step 10
for (int i = 0; i <= rows; ++i)
{
InsertionSortPacking __tempPacking{};
if (i == rows) // Step 3.1, for the lastArray
{
__tempPacking.Array = (ull *)lastArray;
__tempPacking.ArraySize = lastArraySize;
}
else // Step 3.2, for the remaining arrays
{
__tempPacking.Array = (ull *)subArrays[i];
__tempPacking.ArraySize = cols;
}
int __rc = pthread_create(&threads[i], nullptr, insertionSort, (void *)&__tempPacking);
if (__rc)
{
std::cout << "[ERROR] Unable to create thread, rc " << __rc << " i, " << i << std::endl;
exit(-1);
}
}
// step 11, joining the pthreads, regardless of array size
for (int i = 0; i <= rows; ++i)
{
int __rc = pthread_join(threads[i], nullptr);
if (__rc)
{
std::cout << "[ERROR] Unable to join thread, rc " << __rc << " i, " << i << std::endl;
exit(-1);
}
}
// Step 12, checking if all the jobs have completed the sorting
for (int i = 0; i <= rows; ++i)
{
InsertionSortPacking __tempPacking{};
if (i == rows) // Step 3.1, for the lastArray
{
__tempPacking.Array = (ull *)lastArray;
__tempPacking.ArraySize = lastArraySize;
if (isSorted(&__tempPacking) == -1) // Sorting succeeded if -1 returned
std::cout << "Sorting succeeded for lastArrray\n";
else
std::cout << "Sorting failed for lastArray\n";
}
else // Step 3.2, for the remaining arrays
{
__tempPacking.Array = (ull *)subArrays[i];
__tempPacking.ArraySize = cols;
if (isSorted(&__tempPacking) == -1) // Sorting succeeded if -1 returned
std::cout << "Sorting succeeded for subArray[" << i << "]\n";
else
std::cout << "Sorting failed for subArray[" << i << "]\n";
}
}
...
// further code for merging and whatnot
return sortedArray;
还有我用来编译和运行的命令,g++ -std=gnu++17 -std=c++17 -O2 insertion.cpp -o insertion -lpthread && ./insertion > log.log
以下是完整程序的一些日志,https://gist.github.com/Rubix982/d0bbdabc2d8a1fc531e9104f7172dbfe我有什么问题,为什么我不能解决它们?
sleep()
确保线程在将它们连接在一起之前按时完成它们的工作,但无济于事。 结论
任何帮助,将不胜感激!请让我知道我可能在哪里出错,以及有哪些可能的方法来调试和解决这个问题。
我知道我不应该使用选择排序/插入排序进行并行化,我应该使用合并排序和快速排序与其并行版本相反,但这只是为了好玩和学习。
谢谢!
最佳答案
您如何启动线程存在一个重大问题:
for (int i = 0; i <= rows; ++i) { InsertionSortPacking __tempPacking{};
注意
__tempPacking
的生命周期是宿主循环的一次迭代,但在这里......[...]
int __rc = pthread_create(&threads[i], nullptr, insertionSort, (void *)&__tempPacking);
... 你传递一个指向
__tempPacking
的指针到新线程。一旦该对象的生命周期结束,新线程不得尝试访问它,但即使所有 insertionSort()
用它做的是将成员读回局部变量,没有什么强制它在对象的生命周期内完成。这是一般意义上的竞争条件,即使不是语言标准定义的意义上的“数据竞争”,并且当排序线程丢失时,产生的行为是未定义的。if (__rc) { std::cout << "[ERROR] Unable to create thread, rc " << __rc << " i, " << i << std::endl; exit(-1); } }
你继续评论
- This does not seem like a race condition at all. Each array is different and independent in memory. No two threads access a different thread anywhere in the sequence
看上面。确实存在竞争条件,只是与您正在查看的对象无关。
- ... It could be that threads are sometimes joined when they are partially sorted - can that happen?
有了UB,什么都可能发生。在没有 UB 的情况下,加入一个线程会导致加入者等待线程函数终止。你还没有提交,所以我不能说你的
insertionSort()
函数可能容易在没有对数组进行完全排序的情况下终止,但这并不是它在多线程程序中使用的特有特征。
- I'm not running more threads than my machine can handle ( 4 cores for sure )
这没什么大不了的。您可以拥有比内核更多的线程;它们只是不会同时运行。
- I do not understand where to and how to debug why sometimes 1 thread fails, or why 3 threads fail the other time
是的,调试多线程程序非常具有挑战性。许多调试器可以运行多线程代码并查询不同线程的状态,但是与线程相关的错误特别可能对执行环境敏感,因此它们在调试器中运行时可能不会表现出来。
- I do not see the need for mutex locks here at all. No race conditions, but maybe I'm thinking about this from the wrong angle
您可以通过确保
InsertionSortPacking
来避免使用同步对象。提供给每个线程的对象比该线程的生命周期更长,同时保留每个线程都有自己独特的属性的属性。或者,您可以动态分配它们并让排序线程负责删除它们。
- I tried using sleep() to make sure the threads get their work done in due time BEFORE joining them together, but to no avail.
有时
sleep()
可以解决同步问题,但它永远不是真正的解决方案。如果存在竞争条件,则 sleep()
可以歪曲每个参赛者获胜的可能性,但这并不能解决首先存在比赛的问题。在这种情况下,一个
sleep()
如果你把它放在线程启动循环的末尾而不是之后,实际上可能会阻止问题的出现,但这会破坏你的目的,同样,它不会是一个真正的解决方案。
关于c++ - 并行插入排序、幼稚尝试、使用 pthread 尝试失败、线程未并行排序,但与串行一起工作正常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63881355/