c - 使用 MPI 进行双调排序

标签 c sorting parallel-processing mpi

我正在研究一个关于使用 MPI 和 C 实现并行双调排序的项目。我开发的程序可以运行,但效率不高,因为简单的 QuickSort(叹息)在执行时间方面胜过它。也许问题出在通信成本上,但我不知道如何改进它,所以这里是代码:

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <math.h>
#include <time.h>
#include <sys/time.h>
#include <string.h>

#include "bs-util.h"
#include "quicksort.h"

#define TAG 1


/* Run this program knowing that:
 * 1) The number of cores must be a power of 2
 * 2) The length of the array to order must be a power of 2
 * 
 * Exec Example: mpirun -n 4 ./bs 1024 1024
 * */


void exchange(FILE *log, int i, int partner, int up);

int countTransfer = 0;

int *myArray, *partnerArray;
int currentPartner = -1;
int rank, size;
MPI_Status status;
int verbose = 0; //this var toggles on(1) or off(0) some useful prints for debugging purpose
int amount=0;

int main(int argc, char *argv[])
{
    int *array;
    int i=0;
    int carry=0;
    int up=1;
    int count=0;

    struct timeval tim;

    FILE *log;

    char logName[15] = "log/";

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    /* Time meter */
    srand((double) time(NULL));
    gettimeofday(&tim, NULL);
    double t1=tim.tv_sec+(tim.tv_usec/1000000.0);

    snprintf(logName+4, 10, "%d",rank);
    log = fopen(logName,"w");

    printf("Hello world from process %d of %d.\n", rank, size);
    MPI_Barrier(MPI_COMM_WORLD);

    /* INPUT */

    if (rank==0) 
    {   
        if (argc==2) /* by file */
        {
            FILE *input = fopen(argv[1],"r");
            char line[20]; 
            count = 0;
            while(fgets(line,20,input) != NULL)
            {
                count++;
            }
            fclose(input);
            array = (int *)malloc(count*sizeof(int));
            input = fopen(argv[1],"r");
            i = 0;
            while(fgets(line,20,input) != NULL)
            {
                array[i] = atoi(line);
                i++;
            }
            fclose(input);
        }
        else
            if (argc==3) /* by command line */
            {
                count = atoi(argv[1]); 
                int max = atoi(argv[2]);
                array = (int *)malloc(count*sizeof(int));
                srand(time(NULL));
                for (i=0; i<count; i++)
                {
                    array[i] = rand()%max;
                }
            }
            else
            {
                printf("\n\n ----------- ERRORE NEI PARAMETRI DI INPUT ----------- \n\n");
                return 1;
            }

        /* END OF THE INPUT */

        if (verbose){
            printf("Initial array:\n");
            for (i=0; i<count; i++)
            {
                printf("%d\t", array[i]);
            }
            printf("\n");
        }
        /* Everyone wait eachother */
        MPI_Barrier(MPI_COMM_WORLD);

        carry = count%size;
        amount = count/size + carry;
        printf("\nParametri: amount=%d carry=%d\n\n", amount, carry);
        up=1;
        int startIndex = amount;


        myArray = (int *)malloc(amount*sizeof(int));
        /* Buffer (partner) */
        partnerArray = (int *)malloc(amount*sizeof(int));

        for (i=0; i<amount; i++)
             myArray[i] = array[i];
        printf("Processo %d riceve amount=%d e up=%d\n", rank, amount, up);
        if (verbose){
            printf("Mia porzione ---> ");
            for (i=0; i<amount; i++)
            {
                printf("%d\t", myArray[i]);
            }
            printf("\n");
        }

        /* Sending the big array's chunks */
        for (i=1; i<size; i++)
        {
            up = (i+1) % 2;
            MPI_Send(&up, 1, MPI_INT, i, TAG, MPI_COMM_WORLD);
            MPI_Send(&amount, 1, MPI_INT, i, TAG, MPI_COMM_WORLD);
            MPI_Send(&carry, 1, MPI_INT, i, TAG, MPI_COMM_WORLD);

            MPI_Send(array+startIndex, amount-carry, MPI_INT, i, TAG, MPI_COMM_WORLD);

            startIndex += amount-carry;
        }

        MPI_Barrier(MPI_COMM_WORLD); 
    } 
    else
    {
        MPI_Barrier(MPI_COMM_WORLD);

        MPI_Recv(&up, 1, MPI_INT, 0, TAG, MPI_COMM_WORLD, &status);
        MPI_Recv(&amount, 1, MPI_INT, 0, TAG, MPI_COMM_WORLD, &status);
        MPI_Recv(&carry, 1, MPI_INT, 0, TAG, MPI_COMM_WORLD, &status);
        myArray = (int *)malloc(amount*sizeof(int));
        partnerArray = (int *)malloc(amount*sizeof(int)); /* Buffer (partner) */
        MPI_Recv(myArray, amount, MPI_INT, 0, TAG, MPI_COMM_WORLD, &status);


        /* Experimental padding: every chunck has the same amount of items. */
        for (i=amount-carry; i<amount; i++)
        {
            myArray[i] = 0;
        }

        printf("\n");
        printf("Processo %d riceve amount=%d e up=%d\n", rank, amount-carry, up);
        if (verbose){
            printf("Mia porzione ---> ");
            for (i=0; i<amount; i++)
            {
                printf("%d\t", myArray[i]);
            }
            printf("\n");
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* CORE */

    /* Local Quicksort */
    int result = quickSort(&myArray[0], amount); //this function is written within src/quicksort.c
    if (verbose){
        if (result == 1)
            printf("Quick Sort: FAIL \n");
        else
        {
            printf("\nLa mia porzione ordinata (processo %d)\n", rank);
            for(i=0; i<amount; i++)
            {
                printf("%d ",myArray[i]);
            }
            printf ("\n");
        }
    }

    int j;

    for (up=8;up<=amount*size;up=2*up)
    {
        for (j=up>>1;j>0;j=j>>1)
        {
            for (i=0;i<amount*size;i++)
            {
                int partner=i^j;                
                if ((partner)>i)
                {   
                    exchange(log,i,partner,i&up);
                }

            }
        }
    }

    /* END OF THE CORE */

    if (rank!=0)
    {   
        MPI_Send(myArray, amount, MPI_INT, 0, TAG, MPI_COMM_WORLD);
    }
    gettimeofday(&tim, NULL);
    double t2=tim.tv_sec+(tim.tv_usec/1000000.0);
    if (rank==0)
    {
        myArray = (int *)realloc(myArray,sizeof(int)*amount*size);
        for (i=1; i<size; i++)
            MPI_Recv(myArray+i*amount, amount, MPI_INT, i, TAG, MPI_COMM_WORLD, &status);
        printf("\nTempo trascorso %6f\n", t2-t1);
        fprintf(log,"\n\n----------> Array Iniziale <----------\n");
        printArray(log,array,count);
        fprintf(log,"\n\n----------> Array Finale <----------\n");
        printArray(log,myArray+(carry*(size-1)),count);
        /*printArray(log,myArray,newAmount*size);*/

    }    
    fprintf(log,"Numero di chunk scambiati: %d\n",countTransfer);
    fclose(log);
    MPI_Finalize();
    return 0;
}

void exchange(FILE *log, int i, int partner, int up)
{
    int rank_i = i/amount;
    int rank_partner = partner/amount;

    int offset_i = i%amount;
    int offset_partner = partner%amount;
    /*if (verbose)
        fprintf(log,"\nnewAmount = %d - Rank_i = %d - Rank_partner = %d - Offset_i = %d - Offset_partner = %d \n",amount,rank_i,rank_partner,offset_i,offset_partner);
    */

    if ((rank_i != rank) && (rank_partner != rank))
        return;

    if ((rank_i == rank) && (rank_partner == rank))
    {   
        if (((up==0) && (myArray[offset_i] > myArray[offset_partner])) || ((up!=0) && (myArray[offset_i] < myArray[offset_partner])))
        {
            int temp = myArray[offset_i];
            myArray[offset_i] = myArray[offset_partner];
            myArray[offset_partner] = temp;
        }
        return;
    }

    if (rank_i == rank && rank_partner != rank)
    {
        if (currentPartner != rank_partner)
        {
            MPI_Send(myArray, amount, MPI_INT, rank_partner, TAG, MPI_COMM_WORLD);
            MPI_Recv(partnerArray, amount, MPI_INT, rank_partner, TAG, MPI_COMM_WORLD, &status);
            currentPartner = rank_partner;
            countTransfer++;
        }
        if (((up==0) && (myArray[offset_i] > partnerArray[offset_partner])) || ((up!=0) && (myArray[offset_i] < partnerArray[offset_partner])))
            myArray[offset_i] = partnerArray[offset_partner];
        return;
    }

    if (rank_i != rank && rank_partner == rank)
    {
        if (currentPartner != rank_i)
        {
            MPI_Recv(partnerArray, amount, MPI_INT, rank_i, TAG, MPI_COMM_WORLD, &status);
            MPI_Send(myArray, amount, MPI_INT, rank_i, TAG, MPI_COMM_WORLD);
            currentPartner = rank_i;
            countTransfer++;
        }
        if (((up==0) && (partnerArray[offset_i] > myArray[offset_partner])) || ((up!=0) && (partnerArray[offset_i] < myArray[offset_partner])))
            myArray[offset_partner] = partnerArray[offset_i];
        return;
    }

}

这是 Make 文件:

CC = mpicc
OPTIMIZE = 
CFLAGS = $(DEFINES) $(OPTIMIZE)
LFLAGS = -lm                
PROGS = ./bs
PROGS_SRC = src/bs-util.c src/bs.c src/quicksort.c


all:
    $(CC) $(CFLAGS) $(LFLAGS) -o $(PROGS) $(PROGS_SRC)

帮助将不胜感激:)

引用文献:http://goo.gl/nXt4p

最佳答案

请记住,与快速排序 N log N(串行版本)相比,双调排序具有类似 N/P (log N)^2 的时间复杂度。这意味着对于 log N > P(P ~ 处理器数量),串行快速排序甚至应该击败双调排序(我不是在谈论与某些因素相乘,这取决于实现,也不是通信)。双调排序适用于真正的并行计算机(它在 GPU 上非常好),而不是像您可能拥有的由几台 PC 组成的网格。

关于c - 使用 MPI 进行双调排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7433723/

相关文章:

ruby - 为什么我的 Minitest 测试没有并行运行?

bash - 同时执行多个shell脚本

c++ - parallel_for 函数导致内存泄漏(有时)

c - 为什么 %d 代表整数?

c - 使用winapi为当前进程设置硬件断点

c - 在 libxml 中选择使用 xPath 验证条件的第一个元素

c - 按位否定后右移的意外结果

python - 带有列表的字典 : Fastest way to get the key corresponding to the minimum value of the fourth list item?

javascript - 如何避免使用 2 个条件对 JavaScript 数组进行两次排序

java - 对 3 列的二维数组进行排序