c++ - 并行插入排序、幼稚尝试、使用 pthread 尝试失败、线程未并行排序,但与串行一起工作正常

标签 c++ multithreading sorting parallel-processing pthreads

语境
你好!
我试图尝试使用 pthreads,并决定实现插入排序以查看性能差异,但我遇到了一个奇怪的错误,我不确定该去哪里或我真正遇到了什么问题。
脚步
对于上下文,我为并行插入排序所做的如下,

  • 制作一个函数来单独包含它
  • 向该函数发送一个结构容器,i) 指向数组本身的指针,ii) 数组
    尺寸,
  • 获得内核数(让我们通过我机器上的函数将其称为 numberOfCores(是的,输出是正确的,我有 4 个内核)
  • 已创建 numberOfCores pthread_t 对象
  • 请记住,最后一个线程不必处理包含相同数量元素的数组(尝试相应地将余额分配给线程)
  • 所以我创建了一个二维矩阵,行 == numberOfCores - 1 , 和 cols == floor(sizeOfArray / numberOfCores) ,我用不同的测试输入一遍又一遍地检查,这些都是正确分配的,
  • 做了一个 lastArray数组,大小,lastArraySize = (sizeOfCompleteArray / numberOfCores) + (sizeOfCompleteArray % numberOfCores)
  • 然后我将原始数组拆分为 subArrays ,二维矩阵
  • 然后我将数组的最后一部分拆分为 lastArray ,
  • 然后我打包了各自的数组和它们的大小,并分配了线程来运行简单的 insertionSort函数,发送各自的 packed数据。我的计划是merge按每个单独的线程排序后的不同数组。我知道编写更高效的代码库是 100% 可能的,但这只是小事,我不想在这上面花太多钱。请参阅下面的代码逐行检查。
  • 然后我用了 pthread_join获取 numberOfCores排序后的数组
  • 然后我检查每个是否已排序。

  • 问题
    这是出现的问题,
  • 如果我使用循环顺序对上述数组进行排序,从第 9 步(上面提到)开始,所有数组都会按预期排序。这里没有问题
  • 然而,如果我尝试我的并行版本,我会遇到意想不到的结果。有时 2 被排序,有时只有 1,有时 3,但从来没有 4。为什么它总是给我不可靠的结果,我现在无法理解。
  • Array[0] 总是未排序的。我不知道为什么。当我使用上面描述的串行方法时,它确实得到了排序,但从来没有使用并行方法。

  • 代码
    
    // def for InsertionSortPacking, mentioned below 
    struct InsertionSortPacking
    {
        ull *Array;       // Pointer to the array
        size_t ArraySize; // Size of the array
    };
    
    static inline void *
    pthread_insertionSort(void *arrayMetaDataToUnpack) // arrayMetaDataToUnpack contains the pointer used to pass the data to the func
    {
        // UNPACKING START
        ull *_Array = ((InsertionSortPacking *)arrayMetaDataToUnpack)->Array;
        size_t sizeOfArray = ((InsertionSortPacking *)arrayMetaDataToUnpack)->ArraySize;
        // UNPACKING END
    
        // Compulsory use of reinterpret_cast to keep everything consistent
        ull *__tempArray = reinterpret_cast<ull *>(_Array);
    
        // Custom func to get the number of cores on machine
        int numberOfCores = getNumCores();
        std::cout << "Number of cores detected: " << numberOfCores << ".\n";
    
        // Step 1, create vars
        pthread_t threads[numberOfCores];              // all threads must run, step 4
        int rows = numberOfCores - 1;                  // ... but not every thread might be bound to have equal job ( equal sub array sizes ), step 5
        int cols = floor(sizeOfArray / numberOfCores); // the most evenly contained size possible, step 6
        ull subArrays[rows][cols]{0u};                 // the {} initializes everything for me, step 6
    
        // step 7
        int lastArraySize = (sizeOfArray / numberOfCores) + (sizeOfArray % numberOfCores);
        ull lastArray[lastArraySize];
    
        // step 8
        for (int i = 0; i < rows; ++i)
            for (int j = 0; j < cols; ++j)
                subArrays[i][j] = __tempArray[i * numberOfCores + j];
    
        // step 9
        for (int i = 0, j = cols * rows + 1;
             i < lastArraySize and j < sizeOfArray;
             ++i, ++j)
            lastArray[i] = __tempArray[j];
    
        // EXTRA, just for clarification. Individually, all work fine
        // getTimeForTemplate just prints out some logs, takes in the
        // the function pointer to the function I want to use, the array, I want to sort, the size, and a text to print
        // for (int i = 0; i < rows; ++i)
        //     getTimeForTemplate(insertionSort, subArrays[i], cols, "insertion sort - serial, subArray[" + std::to_string(i) + std::string("]"));
        // getTimeForTemplate(insertionSort, lastArray, lastArraySize, "insertion sort - serial, lastArray");
    
        // step 10
        for (int i = 0; i <= rows; ++i)
        {
            InsertionSortPacking __tempPacking{};
    
            if (i == rows) // Step 3.1, for the lastArray
            {
                __tempPacking.Array = (ull *)lastArray;
                __tempPacking.ArraySize = lastArraySize;
            }
            else // Step 3.2, for the remaining arrays
            {
                __tempPacking.Array = (ull *)subArrays[i];
                __tempPacking.ArraySize = cols;
            }
    
            int __rc = pthread_create(&threads[i], nullptr, insertionSort, (void *)&__tempPacking);
            if (__rc)
            {
                std::cout << "[ERROR] Unable to create thread, rc " << __rc << " i, " << i << std::endl;
                exit(-1);
            }
        }
    
        // step 11, joining the pthreads, regardless of array size
        for (int i = 0; i <= rows; ++i)
        {
            int __rc = pthread_join(threads[i], nullptr);
            if (__rc)
            {
                std::cout << "[ERROR] Unable to join thread, rc " << __rc << " i, " << i << std::endl;
                exit(-1);
            }
        }
    
        // Step 12, checking if all the jobs have completed the sorting
        for (int i = 0; i <= rows; ++i)
        {
            InsertionSortPacking __tempPacking{};
    
            if (i == rows) // Step 3.1, for the lastArray
            {
                __tempPacking.Array = (ull *)lastArray;
                __tempPacking.ArraySize = lastArraySize;
    
                if (isSorted(&__tempPacking) == -1) // Sorting succeeded if -1 returned
                    std::cout << "Sorting succeeded for lastArrray\n";
                else
                    std::cout << "Sorting failed for lastArray\n";
            }
            else // Step 3.2, for the remaining arrays
            {
                __tempPacking.Array = (ull *)subArrays[i];
                __tempPacking.ArraySize = cols;
    
                if (isSorted(&__tempPacking) == -1) // Sorting succeeded if -1 returned
                    std::cout << "Sorting succeeded for subArray[" << i << "]\n";
                else
                    std::cout << "Sorting failed for subArray[" << i << "]\n";
            }
        }
    
        ...
        // further code for merging and whatnot
    
        return sortedArray;
    
    
    还有我用来编译和运行的命令,
    g++ -std=gnu++17 -std=c++17 -O2 insertion.cpp -o insertion -lpthread && ./insertion > log.log
    
    以下是完整程序的一些日志,https://gist.github.com/Rubix982/d0bbdabc2d8a1fc531e9104f7172dbfe
    我有什么问题,为什么我不能解决它们?
  • 这看起来根本不像是竞争条件。每个数组在内存中都是不同且独立的。没有两个线程访问序列中任何位置的不同线程
  • ... 可能是线程在部分排序时有时会被连接 - 会发生这种情况吗?
  • 我运行的线程没有超过我的机器可以处理的数量(当然是 4 个内核)
  • 我不明白在哪里以及如何调试为什么有时 1 个线程失败,或者为什么其他时候 3 个线程失败
  • 我根本看不到这里需要互斥锁。没有竞争条件,但也许我从错误的角度考虑这个
  • 我尝试使用 sleep()确保线程在将它们连接在一起之前按时完成它们的工作,但无济于事。

  • 结论
    任何帮助,将不胜感激!请让我知道我可能在哪里出错,以及有哪些可能的方法来调试和解决这个问题。
    我知道我不应该使用选择排序/插入排序进行并行化,我应该使用合并排序和快速排序与其并行版本相反,但这只是为了好玩和学习。
    谢谢!

    最佳答案

    您如何启动线程存在一个重大问题:

        for (int i = 0; i <= rows; ++i)
        {
            InsertionSortPacking __tempPacking{};
    

    注意__tempPacking的生命周期是宿主循环的一次迭代,但在这里......

    [...]

            int __rc = pthread_create(&threads[i], nullptr, insertionSort, (void *)&__tempPacking);
    

    ... 你传递一个指向 __tempPacking 的指针到新线程。一旦该对象的生命周期结束,新线程不得尝试访问它,但即使所有 insertionSort()用它做的是将成员读回局部变量,没有什么强制它在对象的生命周期内完成。这是一般意义上的竞争条件,即使不是语言标准定义的意义上的“数据竞争”,并且当排序线程丢失时,产生的行为是未定义的。
            if (__rc)
            {
                std::cout << "[ERROR] Unable to create thread, rc " << __rc << " i, " << i << std::endl;
                exit(-1);
            }
        }
    

    你继续评论
    1. This does not seem like a race condition at all. Each array is different and independent in memory. No two threads access a different thread anywhere in the sequence

    看上面。确实存在竞争条件,只是与您正在查看的对象无关。
    1. ... It could be that threads are sometimes joined when they are partially sorted - can that happen?

    有了UB,什么都可能发生。在没有 UB 的情况下,加入一个线程会导致加入者等待线程函数终止。你还没有提交,所以我不能说你的insertionSort()函数可能容易在没有对数组进行完全排序的情况下终止,但这并不是它在多线程程序中使用的特有特征。
    1. I'm not running more threads than my machine can handle ( 4 cores for sure )

    这没什么大不了的。您可以拥有比内核更多的线程;它们只是不会同时运行。
    1. I do not understand where to and how to debug why sometimes 1 thread fails, or why 3 threads fail the other time

    是的,调试多线程程序非常具有挑战性。许多调试器可以运行多线程代码并查询不同线程的状态,但是与线程相关的错误特别可能对执行环境敏感,因此它们在调试器中运行时可能不会表现出来。
    1. I do not see the need for mutex locks here at all. No race conditions, but maybe I'm thinking about this from the wrong angle

    您可以通过确保 InsertionSortPacking 来避免使用同步对象。提供给每个线程的对象比该线程的生命周期更长,同时保留每个线程都有自己独特的属性的属性。
    或者,您可以动态分配它们并让排序线程负责删除它们。
    1. I tried using sleep() to make sure the threads get their work done in due time BEFORE joining them together, but to no avail.

    有时 sleep()可以解决同步问题,但它永远不是真正的解决方案。如果存在竞争条件,则 sleep()可以歪曲每个参赛者获胜的可能性,但这并不能解决首先存在比赛的问题。
    在这种情况下,一个 sleep()如果你把它放在线程启动循环的末尾而不是之后,实际上可能会阻止问题的出现,但这会破坏你的目的,同样,它不会是一个真正的解决方案。

    关于c++ - 并行插入排序、幼稚尝试、使用 pthread 尝试失败、线程未并行排序,但与串行一起工作正常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63881355/

    相关文章:

    java - 阻塞队列实现中如何正确使用java的notify

    c++ - 在包含名称和标记的结构中按升序对名称进行冒泡排序

    c++ - “plain/text” encodedData 和 QDataStream 到可读 QString

    安卓 : background thread

    java - 在Java中干净地停止监听serversocket的线程

    php 对多维数组进行排序,但从 1 开始索引

    linux - 在 Linux 中排序

    c++ - Windows下的HDF5作为动态链接库Qt/C++

    java - 应该使用多个循环或一个循环并检查内部条件

    c++ - 在为标准模板函数创建仿函数对象时减少样板