c - 在 C 中使用 PThreads 合并排序

标签 c multithreading pthreads mergesort pthread-join

旁注:我开始学习如何使用 pthreads 并且我开始理解这个概念。我一直在这里使用这个示例脚本(用 C++ 编写)来管理带线程的合并排序:https://www.geeksforgeeks.org/merge-sort-using-multi-threading/

因为我是用 C 而不是 C++ 编写自己的合并排序,所以我重写了这个示例脚本来测试并注意到了一个问题。对于 20 个元素的数组,我决定使用 15 个,而不是 MAX 20。我注意到排序/合并无效(但有点接近),我无法弄清楚为什么......此外,我更改了代码以使用不同数量的线程而不是 THREAD_MAX 4,因此我可以将其更改为 5 或 10 个线程。

会不会是 Merge 产生了无效的结果?我在下面的 main() 中对其进行了评论。

我的 C++ 到 C 的转换如下:

#include <pthread.h>
#include <time.h>
#include <stdlib.h>

// number of elements in array
#define MAX 15

// number of threads
#define THREAD_MAX 4

//using namespace std;

// array of size MAX
int a[MAX];
int part = 0;

// merge function for merging two parts
void merge(int low, int mid, int high)
{   
    int* left = (int*) malloc( (mid - low + 1) * sizeof(int));
    int* right = (int*) malloc( (high - mid) * sizeof(int));


    // n1 is size of left part and n2 is size
    // of right part
    int n1 = mid - low + 1,
    n2 = high - mid,
    i, j;

    // storing values in left part
    for (i = 0; i < n1; i++)
        left[i] = a[i + low];

    // storing values in right part
    for (i = 0; i < n2; i++)
        right[i] = a[i + mid + 1];

    int k = low;
    i = j = 0;

    // merge left and right in ascending order
    while (i < n1 && j < n2) {
        if (left[i] <= right[j])
            a[k++] = left[i++];
        else
            a[k++] = right[j++];
    }

    // insert remaining values from left
    while (i < n1) {
        a[k++] = left[i++];
    }

    // insert remaining values from right
    while (j < n2) {
        a[k++] = right[j++];
    }

    free(left);
    free(right);
}

// merge sort function
void merge_sort(int low, int high)
{
    // calculating mid point of array
    int mid = low + (high - low) / 2;
    if (low < high) {

        // calling first half
        merge_sort(low, mid);

        // calling second half
        merge_sort(mid + 1, high);

        // merging the two halves
        merge(low, mid, high);
    }
}

// thread function for multi-threading
void* merge_sort123(void* arg)
{
    // which part out of 4 parts
    int thread_part = part++;

    // calculating low and high
    int low = thread_part * (MAX / THREAD_MAX);
    int high = (thread_part + 1) * (MAX / THREAD_MAX) - 1;

    // evaluating mid point
    int mid = low + (high - low) / 2;
    if (low < high) {
        merge_sort(low, mid);
        merge_sort(mid + 1, high);
        merge(low, mid, high);
    }
    return 0;
}



// Driver Code
int main()
{
    // generating random values in array
    for (int i = 0; i < MAX; i++){
        a[i] = rand() % 100;
    //        printf("%d ", a[i]);
    }


    pthread_t threads[THREAD_MAX];

    // creating 4 threads
    for (int i = 0; i < THREAD_MAX; i++)
        pthread_create(&threads[i], NULL, merge_sort123,
                       (void*)NULL);

    // joining all 4 threads
    for (int i = 0; i < THREAD_MAX; i++)
        pthread_join(threads[i], NULL);


    ///////////////////////////////////////////////////////////////
    // --- THIS MAY BE THE PART WHERE THE MERGING IS INVALID --- //
    ///////////////////////////////////////////////////////////////
    // merging the final 4 parts
    merge(0, (MAX / 2 - 1) / 2, MAX / 2 - 1);
    merge(MAX / 2, MAX/2 + (MAX-1-MAX/2)/2, MAX - 1);
    merge(0, (MAX - 1)/2, MAX - 1);


    // displaying sorted array
    printf("\n\nSorted array: ");
    for (int i = 0; i < MAX; i++)
        printf ("%d ", a[i]);


    printf("\n");
    return 0;
}

最佳答案

正如我在热门评论中提到的,原始代码存在一些问题。

上述竞争条件。

merge 底部 [似乎] 缺少 delete

事实上数组元素的数量必须是线程数量的倍数。否则,最后一个线程的范围将计算不正确。

主线程中的最终合并对于 4 个线程是固定/硬连线的。


一个通用的解决方案是可能的。但是,除非数组大小非常大,否则它不会节省太多时间,所以它主要用于练习多线程[我相信这就是你想要的]。请参阅:Multithreaded quicksort or mergesort

使用控制结构将多个参数传递给线程会更容易。对于一般的多线程来说,这是一种很好的技术。

主线程可以用每个线程的数组范围预填充它。它稍后可以使用这些控制结构来概括最终合并。

这是一个清理过的版本,适用于任意数组大小和任意数量的线程:

#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>

int opt_a;
int opt_t;
int opt_r;

// number of elements in array
//#define MAX 15
//#define MAX 16
int MAX;

// number of threads
//#define THREAD_MAX 4
int THREAD_MAX;

//using namespace std;

// array of size MAX
int *a;

// thread control parameters
struct tsk {
    int tsk_no;
    int tsk_low;
    int tsk_high;
};

// merge function for merging two parts
void
merge(int low, int mid, int high)
{

    // n1 is size of left part and n2 is size of right part
    int n1 = mid - low + 1;
    int n2 = high - mid;

    int *left = malloc(n1 * sizeof(int));
    int *right = malloc(n2 * sizeof(int));

    int i;
    int j;

    // storing values in left part
    for (i = 0; i < n1; i++)
        left[i] = a[i + low];

    // storing values in right part
    for (i = 0; i < n2; i++)
        right[i] = a[i + mid + 1];

    int k = low;

    i = j = 0;

    // merge left and right in ascending order
    while (i < n1 && j < n2) {
        if (left[i] <= right[j])
            a[k++] = left[i++];
        else
            a[k++] = right[j++];
    }

    // insert remaining values from left
    while (i < n1)
        a[k++] = left[i++];

    // insert remaining values from right
    while (j < n2)
        a[k++] = right[j++];

    free(left);
    free(right);
}

// merge sort function
void
merge_sort(int low, int high)
{

    // calculating mid point of array
    int mid = low + (high - low) / 2;

    if (low < high) {
        // calling first half
        merge_sort(low, mid);

        // calling second half
        merge_sort(mid + 1, high);

        // merging the two halves
        merge(low, mid, high);
    }
}

// thread function for multi-threading
void *
merge_sort123(void *arg)
{
    struct tsk *tsk = arg;
    int low;
    int high;

    // calculating low and high
    low = tsk->tsk_low;
    high = tsk->tsk_high;

    // evaluating mid point
    int mid = low + (high - low) / 2;

    if (low < high) {
        merge_sort(low, mid);
        merge_sort(mid + 1, high);
        merge(low, mid, high);
    }

    return 0;
}

// Driver Code
int
main(int argc, char **argv)
{
    char *cp;
    struct tsk *tsk;

    --argc;
    ++argv;

    MAX = 15;
    THREAD_MAX = 4;

    // use new/general algorithm by default
    opt_a = 1;

    for (; argc > 0; --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'M':  // array count
            MAX = atoi(cp + 2);
            break;

        case 'T':  // thread count
            THREAD_MAX = atoi(cp + 2);
            break;

        case 'a':  // change algorithm
            opt_a = !opt_a;
            break;

        case 'r':  // do _not_ use rand -- use linear increment
            opt_r = !opt_r;
            break;

        case 't':  // tracing
            opt_t = !opt_t;
            break;

        default:
            break;
        }
    }

    // allocate the array
    a = malloc(sizeof(int) * MAX);

    // generating random values in array
    if (opt_t)
        printf("ORIG:");
    for (int i = 0; i < MAX; i++) {
        if (opt_r)
            a[i] = MAX - i;
        else
            a[i] = rand() % 100;
        if (opt_t)
            printf(" %d", a[i]);
    }
    if (opt_t)
        printf("\n");

    pthread_t threads[THREAD_MAX];
    struct tsk tsklist[THREAD_MAX];

    int len = MAX / THREAD_MAX;

    if (opt_t)
        printf("THREADS:%d MAX:%d LEN:%d\n", THREAD_MAX, MAX, len);

    int low = 0;

    for (int i = 0; i < THREAD_MAX; i++, low += len) {
        tsk = &tsklist[i];
        tsk->tsk_no = i;

        if (opt_a) {
            tsk->tsk_low = low;
            tsk->tsk_high = low + len - 1;
            if (i == (THREAD_MAX - 1))
                tsk->tsk_high = MAX - 1;
        }

        else {
            tsk->tsk_low = i * (MAX / THREAD_MAX);
            tsk->tsk_high = (i + 1) * (MAX / THREAD_MAX) - 1;
        }

        if (opt_t)
            printf("RANGE %d: %d %d\n", i, tsk->tsk_low, tsk->tsk_high);
    }

    // creating 4 threads
    for (int i = 0; i < THREAD_MAX; i++) {
        tsk = &tsklist[i];
        pthread_create(&threads[i], NULL, merge_sort123, tsk);
    }

    // joining all 4 threads
    for (int i = 0; i < THREAD_MAX; i++)
        pthread_join(threads[i], NULL);

    // show the array values for each thread
    if (opt_t) {
        for (int i = 0; i < THREAD_MAX; i++) {
            tsk = &tsklist[i];
            printf("SUB %d:", tsk->tsk_no);
            for (int j = tsk->tsk_low; j <= tsk->tsk_high; ++j)
                printf(" %d", a[j]);
            printf("\n");
        }
    }

    // merging the final 4 parts
    if (opt_a) {
        struct tsk *tskm = &tsklist[0];
        for (int i = 1; i < THREAD_MAX; i++) {
            struct tsk *tsk = &tsklist[i];
            merge(tskm->tsk_low, tsk->tsk_low - 1, tsk->tsk_high);
        }
    }
    else {
        merge(0, (MAX / 2 - 1) / 2, MAX / 2 - 1);
        merge(MAX / 2, MAX / 2 + (MAX - 1 - MAX / 2) / 2, MAX - 1);
        merge(0, (MAX - 1) / 2, MAX - 1);
    }

    // displaying sorted array
    printf("\n\nSorted array:");
    for (int i = 0; i < MAX; i++)
        printf(" %d", a[i]);
    printf("\n");

    return 0;
}

关于c - 在 C 中使用 PThreads 合并排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52767944/

相关文章:

c - 读取 C 中的一行并将其拆分

c - 使用 MPI_Gather 将所有进程计算出的总和收集到一个数组中

c++ - 如何在 C++ 14 的类中启动线程?

iphone - 在单独的 iOS 线程上延迟执行

c - pthread_cond_broadcast 被 dlsym 破坏了吗?

scanf 的奇怪行为

java - 外部类中的本地字段不受 Runnable 代码的影响

c - 使用互斥锁和 pthread 同步屏幕输出

c - Linux pthread 生产者和消费者

c - 在 Visual Studio 2010 中使用 SDCC?