c++ - 使用MPI取数据临时使用然后返回结果

标签 c++ mpi

我有一个起始位置数组,用于为程序中的每个婴儿节点读取一个文件。我正在尝试让头节点发送位置以开始将文件读取到每个节点,然后让它们发回结果。
困难在于文件中的节点到行的数量不完美,因此必须反复使用它们。为此,我尝试使用 for 循环进行发送和接收,其中头节点发送文件中行数的消息,婴儿节点接收文件中行数除以的消息婴儿节点的数量。

简而言之,它不适合我,我真的不知道该怎么做。

if(qNum == 1){  //If query Number is one
    if(firstSource == 1){ //And the source is equal to 1
        if(my_rank == 0){ // if this process is the head node
            int startVal = 0; // declare variable for starting value
            int z = 1; // declare variable to loop through baby nodes
            for(int i = 1; i <= enronInfo[0]; i++){  // for # of lines in file
                if(z == world_size){ // if process num equals largest process num reset to 1
                    z = 1;
                }
                startVal = getFseekVal(i, firstSource); //set the startVal to the value at location I in the array.
                MPI_Send(&startVal, 1, MPI_INT, z, i, MPI_COMM_WORLD); //send a message to processor z with the startVal
            }
            MPI_Barrier(MPI_COMM_WORLD); //Don't know if this helps
            if(my_rank != 0){ //if not the headnode
                int startVal; // declare variable for starting value
                for(int i = 0; i<=babyLoopSize; i++){ // for # of lines in processor divided by # of babynodes
                    MPI_Recv(&startVal, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // receive a message with startVal from the headnode
                }
            }
            MPI_Barrier(MPI_COMM_WORLD); // Don't know if this helps
        }
    }







#include <fstream>
#include <string>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

using namespace std;
int enronDSarr[39859], nipsDSarr[1499], kosDSarr[3429], nytDSarr[299999]; //containers for docstring values
string enronV = "/home/mcconnel/BagOfWords/vocab.enron.txt";
string nipsV = "/home/mcconnel/BagOfWords/vocab.nips.txt";
string kosV = "/home/mcconnel/BagOfWords/vocab.kos.txt";
string nytV = "/home/mcconnel/BagOfWords/vocab.nytimes.txt";
string enronDW = "/home/mcconnel/BagOfWords/docword.enron.txt";
string nipsDW = "/home/mcconnel/BagOfWords/docword.nips.txt";
string kosDW = "/home/mcconnel/BagOfWords/docword.kos.txt";             // Strings for locations of each file
string nytDW = "/home/mcconnel/BagOfWords/docword.nytimes.txt";
string enronDS = "/home/mcconnel/BagOfWords/docstart.enron.txt";
string nipsDS = "/home/mcconnel/BagOfWords/docstart.nips.txt";
string kosDS = "/home/mcconnel/BagOfWords/docstart.kos.txt";
string nytDS = "/home/mcconnel/BagOfWords/docstart.nytimes.txt";
int enronInfo[3], nipsInfo[3], kosInfo[3], nytInfo[3];                  // Arrays storing first 3 lines of info from each DocWords file
int firstSource, secondSource, numTimes, qNum, wordLength;
string enteredWord;
char enteredWordChar[50];



int word2int(string fileLocation, string input){                // converts text from a file into a word count value
        ifstream file;
        file.open(fileLocation.c_str());
        string word;
        int i = 1;
        int wordNum;
        while(file.good()){
                file >> word;
                if(word.compare(input)== 0){
                        wordNum = i;
                        return wordNum;
                }
                i++;
        }
        return wordNum;
}

int getFseekVal(int docNumber, int sourceNumber){
        if(sourceNumber  == 1){
                return enronDSarr[docNumber - 1];
        }
        else if(sourceNumber == 2){
                return nipsDSarr[docNumber - 1];
        }
        else if(sourceNumber == 3){
                return kosDSarr[docNumber - 1];
        }
        else{
                return nytDSarr[docNumber - 1];
        }
}


string int2word(string fileLocation, int wordInt){              // converts a word count value from a file into the actual text
        ifstream file;
        file.open(fileLocation.c_str());
        string word;
        int i = 1;
        string retWord;
        while(file.good()){
                file >> word;
                if(i == wordInt){
                        retWord = word;
                        return retWord;
                }
                i++;
        }
        return 0;
}


int getInfoDW(string fileLocation, int pos){                    //imports an array of length 3 for each document's info in the docwords file
        ifstream file;
        file.open(fileLocation.c_str());
        int word;
        int i = 0;
        int retWord;
        while(file.good()){
                file >> word;
                if(i == 0 && pos == 0){
                        return word;
                }
                if(i == 1 && pos == 1){
                        return word;
                }
                if(i == 2 && pos == 2){
                        return word;
                }
                i++;
        }
        return retWord;
}

int getEnronDS(string fileLocation){                            // imports array from Enron docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(), "r");
    for(i = 0; i<39861; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                enronDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}


int getNipsDS(string fileLocation){                             // imports array from NIPS docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<1500; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                nipsDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getKosDS(string fileLocation){                              // imports array from KOS docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<3430; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                kosDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getNytDS(string fileLocation){                              // imports array from NYT docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<300000; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                nytDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getCurrentDS(int fileNumber, int documentNum){              //Will be used to return docstart byte value at document location
        if(fileNumber == 0){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 1){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 2){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 3){
                return enronDSarr[documentNum - 1];
        }
        else{
                printf("Something definitely went wrong");
        }
}

int getSourceNumber(){
        int source;
        printf("Select a wordbag:\n 1. Enron \n 2. NIPS \n 3. KOS\n 4. NYT\n");
        cin >> source;
        return source;
}
int getUserResponse(){
   int i = 1;
   while(i){
        printf("Choose a query(1-4) and press enter:\n");
        printf("1. What percent of documents in X use any one word more than ____ times?\n");
        printf("2. What words in X are used more than ____ times in any document?\n");
        printf("3. In which data set does the word ____ appear most frequently?\n");
        printf("4. Does ____ appear more frequently in X or Y?\n");
        cin >> qNum;
        if(qNum < 5 && qNum > 0){
                i = 0;
                printf("%d", qNum);
        }
        else{
                printf("Invalid Response, Please Try Again \n");
        }
        if(qNum == 1){
                firstSource = getSourceNumber();
                printf("and how many times?\n");
                cin >> numTimes;
               // query1(firstSource, numTimes);

        }
        else if(qNum == 2){
                firstSource = getSourceNumber();
                printf("and how many times?\n");
                cin >> numTimes;

        }
        else if(qNum == 3){
                printf("What word would you like to use?\n");
                cin >> enteredWord;
        }
        else if(qNum == 4){
                printf("What word would you like to use?\n");
                cin >> enteredWord;
                printf("Select your first source...\n");
                firstSource = getSourceNumber();
                printf("Select your second source...\n");
                secondSource = getSourceNumber();
        }

    }
}

void importFiles(){
        getEnronDS(enronDS);
        getNipsDS(nipsDS);
        getKosDS(kosDS);                                                        // Functions to read in arrays for each docstart fil
        getNytDS(enronDS);
        for(int a = 0; a <= 2; a++){
                 enronInfo[a] = getInfoDW(enronDW, a);
                 nipsInfo[a] = getInfoDW(nipsDW, a);
                 kosInfo[a] = getInfoDW(kosDW, a);
                 nytInfo[a] = getInfoDW(nytDW, a);
        }
}




int main(int argc, char** argv){
        int world_size, my_rank, numDocs,fseekVal, babyLoopSize, babyLoopSize2, babyLoopSize3, babyLoopSize4;
        MPI_Init(NULL,NULL);
        MPI_Comm_size(MPI_COMM_WORLD, &world_size);
        MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
        cout << " my rank is " << my_rank << "\n";
        importFiles();
        cout << " my rank is " << my_rank << " I know that enron's doc size is " << enronInfo[0] << "\n";
        if(my_rank == 0){
                getUserResponse();
        }
        MPI_Barrier(MPI_COMM_WORLD);
        babyLoopSize = enronInfo[0] / (world_size - 1);
        babyLoopSize2 = nipsInfo[0] / (world_size - 1);
        babyLoopSize3 = kosInfo[0] / (world_size - 1);
        babyLoopSize4 = nytInfo[0] / (world_size - 1);
        //cout << " my rank is " << my_rank << " I know that enron's doc size is " << enronInfo[0] << " and that my babyLoopSize for enron is " << babyLoopSize << "\n";
        //cout << " my rank is " << my_rank << " I know that qNum is  " << qNum << "\n";
        MPI_Bcast(&qNum, 1, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&firstSource, 2, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&secondSource,3, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&enteredWordChar,4, MPI_CHAR, 0, MPI_COMM_WORLD);


        if(qNum == 1){
                if(firstSource == 1){
                        if(my_rank == 0){
                                int startVal = 0;
                                int z = 1;
                                for(int i = 1; i <= enronInfo[0]; i++){
                                        if(z == world_size){
                                                z = 1;
                                        }
                                        startVal = getFseekVal(i, firstSource);
                                        MPI_Send(&startVal, 1, MPI_INT, z, i, MPI_COMM_WORLD);
                                }
                        if(my_rank != 0){
                                int startVal;
                                for(int i = 0; i<=babyLoopSize; i++){
                                        MPI_Recv(&startVal, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
                                }
                        }
                        MPI_Barrier(MPI_COMM_WORLD);
                }
        }




        MPI_Finalize();

}

最佳答案

MPI_Send 通常是一个阻塞命令,参见 here :

This routine may block until the message is received by the destination process.

这意味着每次您在主进程上运行MPI_Send,相应的从进程必须在主进程继续之前运行MPI_Recv。通过包含第一个 MPI_Barrier 行,您告诉每个从机在让从机接收数据之前等待主机完成发送所有数据,但是由于 MPI_Send,master 永远不会从第一次调用 MPI_Send 返回。

对于您的问题,我建议先将其分解成更小的部分。编写代码来回答每个子问题:

// Determine how many workers we have
int nWorkers = ...;

// Determine where in the file each worker should start
// If we store each location in an array we can make use of
// another MPI command latter
int aStartingLocs[nWorkers] = {...};

// Distribute starting locations to each worker
int nMyStart;
MPI_Scatter(aStartingLocs, 1, MPI_INT,   //< Things to send
            &nMyStart, 1, MPI_INT,       //< Recieved value
            ...);

// Our starting location is stored in nMyStart
// TODO: Use starting location to compute results
double dResult = ...;

// We can use MPI_Gather to send the values back to master
double aResults[nWorkers];
MPI_Gather(&dResult, 1, MPI_DOUBLE,    //< What we are sending
           aResults, 1, MPI_DOUBLE,    //< Where to store the result
           ...);

// Now we can use the result how we choose
if (bIAmMaster)
{
   // TODO: Use the results
}

以上只是一个大纲(半伪代码/注释),但希望您能够将其用作指南。

我有一段时间没有使用 MPI,所以您一定要检查一下 MPI_ScatterMPI_Gather 的语法。 Check out the first link that came up on Google.

如果您需要分散一些不能被工作人员数量整除的项目,您可以使用 MPI_ScattervMPI_Gatherv,请参阅 man page .

如果您愿意,可以尝试实现您自己的 MPI_Scatter(v) 版本,我记得 HPC 作业就是这样做的。但是,大多数情况下,仅使用库函数可能更容易/更好。

关于c++ - 使用MPI取数据临时使用然后返回结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34172821/

相关文章:

c++ - 如何在 QModelViews 中结合模态对话框编辑器和就地小部件编辑器?

c# - 如何从 C# 中的 com 对象返回数组(double[])?

c++ - SDL_HWSURFACE 和 SDL_SWSURFACE 在速度和性能方面有什么区别?

c++ - Cmake:检查系统是否包含目录

c++ - QWebView可以从Qt资源文件加载* .js并运行它们吗?

c++ - 如何使用 boost_mpi 发送字符串类型?

c++ - 从接收方的角度来看,我如何确保已收到使用 MPI_Isend 发送的所有消息?

linux - 受限设备上的 MPI 是 Linux 上进程间通信的良好选择吗?

c - 了解二维 MPI 网格中流程的维度、坐标和顺序

c++ - 让奴隶在MPI中互相等待