c - 在用 C 编写的 MPI 应用程序中,使用 pthread_create 来创建线程来调用函数 MPI_Finalize

标签 c multithreading mpi finalize

首先,我明确地说,我是法国人,而且我的英语不太好。

我正在开发 MPI 应用程序,遇到一些问题,希望有人可以帮助我。

正如我的文章标题中所报道的,当我必须终止我的应用程序然后调用 MPI_Finalize 函数时,我尝试使用线程进行监听。

但是,我的应用程序未正确完成。 更准确地说,我收到以下消息:

[XPS-2720:27441] * 处理接收到的信号 *

[XPS-2720:27441] 信号:段错误 (11)

[XPS-2720:27441] 信号代码:地址未映射 (1)

[XPS-2720:27441] 地址失败:0x7f14077a3b6d

[XPS-2720:27440] * 处理接收到的信号 *

[XPS-2720:27440] 信号:段错误 (11)

[XPS-2720:27440] 信号代码:地址未映射 (1)

[XPS-2720:27440] 地址失败:0x7fb11d07bb6d

<小时/>

mpirun 注意到节点 lagniez-XPS-2720 上 PID 为 27440 的进程排名 1 在信号 11(段错误)时退出。

我的从属代码是:

#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <cassert>


#define send_data_tag 1664
#define send_kill_tag 666

void *finilizeMPICom(void *intercomm)
{ 
  printf("the finilizeMPICom was called\n");
  
  MPI_Comm parentcomm = * ((MPI_Comm *) intercomm);
  MPI_Status status;
  int res;

  // sleep(10);
  MPI_Recv(&res, 1, MPI_INT, 0, send_kill_tag, parentcomm, &status);

  int rank;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  printf("we receive something %d -- %d\n", rank, res);
  
  MPI_Finalize();
  exit(0);
}// finilizeMPICom


int main( int argc, char *argv[])
{ 
  int  numtasks, rank, len, rc; 
  char hostname[MPI_MAX_PROCESSOR_NAME];

  int provided, claimed;
  rc = MPI_Init_thread(0, 0, MPI_THREAD_MULTIPLE, &provided);
  MPI_Query_thread( &claimed );
  
  if (rc != MPI_SUCCESS || provided != 3)
    {
      printf ("Error starting MPI program. Terminating.\n");
      MPI_Abort(MPI_COMM_WORLD, rc);
    }
  
  MPI_Comm_rank(MPI_COMM_WORLD,&rank);

  MPI_Comm parentcomm;
  MPI_Comm_get_parent(&parentcomm);

  /* create a second thread to listen when we have to kill the program */
  pthread_t properlyKill;
  if(pthread_create(&properlyKill, NULL, finilizeMPICom, (void *) &parentcomm))
    {     
      fprintf(stderr, "Error creating thread\n");
      return 0;
    }
  
  assert(parentcomm != MPI_COMM_NULL);

  MPI_Status status;
  int root_process, ierr, num_rows_to_receive;

  int mode;
  MPI_Recv( &mode, 1, MPI_INT, 0, send_data_tag, parentcomm, &status);
  printf("c The solver works in the mode %d\n", mode);

  printf("I sent a message %d\n", rank);

  // if(rank != 1) sleep(100);
  
  int res = 1;
  MPI_Send(&res, 1, MPI_INT, 0, send_data_tag, parentcomm);  
  printf("we want to listen for somethiing %d\n", rank);  
  
  int rescc = 1;
  MPI_Recv(&rescc, 1, MPI_INT, 0, send_data_tag, parentcomm, &status);
  printf("I received the message %d %d\n", rescc, rank);
  
  if(rescc == 1000)
    {
      printf("~~~~~~~~>>> I print the solution %d\n", rank);
      int res3 = 1001;
      MPI_Send(&res3, 1, MPI_INT, 0, send_data_tag, parentcomm);
    }
  else printf("I do not understand %d\n", rank);

  printf("I wait the thread to kill the programm %d\n", rank);
  pthread_join(properlyKill, (void**)&(res));
  return 0;
}

对于我的主人:

int main(int argc, char **argv)
{  
  Parser *p = new Parser("slave.xml");

  MPI_Init(&argc, &argv);
  if(p->method == "concurrent")
    {
      ConcurrentManager cc(p->instance, p->solvers);
      cc.run();
    }
  else
    {
      cerr << "c The only available methods are: concurrent, eps (Embarrassingly Parallel Search) or tree" << endl;
      exit(1);
    }

  delete(p);
  MPI_Finalize();
  exit(0);
}// main


/**
   Create a concurrent manager (means init the data structures to run
   the solvers).
   
   @param[in] _instance, the benchmark path
   @param[in] _solvers, the set of solvers that will be ran
 */
ConcurrentManager::ConcurrentManager(string _instance, vector<Solver> &_solvers) :
  instance(_instance), solvers(_solvers)
{
  cout << "c\nc Concurrent manager called" << endl;
  
  nbSolvers = _solvers.size();
  np = new int[nbSolvers];
  cmds = new char*[nbSolvers];
  arrayOfArgs = new char **[nbSolvers];
  infos = new MPI_Info[nbSolvers];

  for(int i = 0 ; i<nbSolvers ; i++)
    {
      np[i] = solvers[i].npernode;

      cmds[i] = new char[(solvers[i].executablePath).size() + 1];
      strcpy(cmds[i], (solvers[i].executablePath).c_str());      

      arrayOfArgs[i] = new char *[(solvers[i].options).size() + 1];
      for(unsigned int j = 0 ; j<(solvers[i].options).size() ; j++)
        {
          arrayOfArgs[i][j] = new char[(solvers[i].options[j]).size() + 1];
          strcpy(arrayOfArgs[i][j], (solvers[i].options[j]).c_str());          
        }
      arrayOfArgs[i][(solvers[i].options).size()] = NULL;

      MPI_Info_create(&infos[i]);

      char hostname[solvers[i].hostname.size()];
      strcpy(hostname, solvers[i].hostname.c_str());
      MPI_Info_set(infos[i], "host", hostname);
    }

  sizeComm = 0;
}// constructor


/**
   Wait that at least one process finish and return the code
   SOLUTION_FOUND.

   @param[in] intercomm, the communicator
 */
void ConcurrentManager::waitForSolution(MPI_Comm &intercomm)
{
  MPI_Status arrayStatus[sizeComm], status;
  MPI_Request request[sizeComm];
  int val[sizeComm], flag;

  for(int i = 0 ; i<sizeComm ; i++) MPI_Irecv(&val[i], 1, MPI_INT, i, TAG_MSG, intercomm, &request[i]);

  bool solutionFound = false;
  while(!solutionFound)
    {
      for(int i = 0 ; i<sizeComm ; i++)
        {
          MPI_Test(&request[i], &flag, &arrayStatus[i]);
          if(flag) 
            {
              printf("--------------------->    %d reveived %d\n", i , val[i]);
              if(val[i] == SOLUTION_FOUND)
                {
                  int msg = PRINT_SOLUTION;
                  MPI_Send(&msg, 1, MPI_INT, i, TAG_MSG, intercomm); // ask to print the solution

                  int msgJobFinished;
                  MPI_Recv(&msgJobFinished, 1, MPI_INT, i, TAG_MSG, intercomm, &status);  // wait the answer
                  assert(msgJobFinished == JOB_FINISHED);

                  cout << "I am going to kill everybody" << endl;
                  
                  int msgKill[sizeComm];
                  for(int j = 0 ; j<sizeComm ; j++)
                    {
                      msgKill[i] = STOP_AT_ONCE;
                      MPI_Send(&msgKill[i], 1, MPI_INT, j, TAG_KILL, intercomm);
                    }

                  solutionFound = true;
                  break;
                } else
                {
                  printf("restart the communication for %d\n", i);
                  MPI_Irecv(&val[i], 1, MPI_INT, i, TAG_MSG, intercomm, &request[i]);
                }
            }
        }      
    }
}// waitForSolution


/**
   Run the solver.
 */
void ConcurrentManager::run()
{
  MPI_Comm intercomm;
  int errcodes[solvers.size()];

  MPI_Comm_spawn_multiple(nbSolvers, cmds, arrayOfArgs, np, infos, 0, MPI_COMM_WORLD, &intercomm, errcodes);
  
  MPI_Comm_remote_size(intercomm, &sizeComm);
  cout << "c Solvers are now running: " << sizeComm << endl;

  int msg = CONCU_MODE;
  for(int i = 0 ; i<sizeComm ; i++) MPI_Send(&msg, 1, MPI_INT, i, TAG_MSG, intercomm); // init the working mode
  
  waitForSolution(intercomm);
}// run

我知道我写了很多代码:(

但是,我不知道问题出在哪里。

请帮助我:)

最诚挚的问候。

最佳答案

The MPI documentation for how MPI interacts with threads要求对 MPI_Finalize() 的调用由主线程执行——即与初始化 MPI 的线程相同。就您而言,这也恰好是您进程的初始线程。

为了满足 MPI 的要求,您可以重新组织应用程序,以便初始线程等待终止信号,然后关闭 MPI。它当前所做的其他工作需要移至不同的线程。

关于c - 在用 C 编写的 MPI 应用程序中,使用 pthread_create 来创建线程来调用函数 MPI_Finalize,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35877627/

相关文章:

mpi - 当远程机器死机时,MPI 管理器无法通过 MPI_Irecv 调用检测到它

c - MPi 多重广播 BIGNUM

c - 质数元素的合并排序?

c - C应用程序中的内存分配

c - Linux 上的线程程序(Posix Thread)

java - 动态创建Consumer

c - MPI 异步发送和接收未按预期工作

java - 将值发送到 android ndk 中的 C 并获取示例(if)响应

java - 哪里可以学习最新的标准化 Java

ruby-on-rails - 在 ruby​​ 中通过并行处理有序插入数据