c++ - 在 world.iprobe 上 boost MPI 崩溃

标签 c++ memory-management boost mpi

我正在做一个项目,我在其中使用 Boost MPI 创建一个主要/次要(主/从)系统,其中主要在循环中将工作分配给次要。大约 30% 的时间程序因以下原因而崩溃。

runner(72828,0x7fff903c6380) malloc: *** error for object 0x7fae75b07af8: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
[Lappies:72828] *** Process received signal ***
[Lappies:72828] Signal: Segmentation fault: 11 (11)
[Lappies:72828] Signal code: Address not mapped (1)
[Lappies:72828] Failing at address: 0x8
[Lappies:72828] [ 0] 0   libsystem_platform.dylib            0x00007fff580a7f5a _sigtramp + 26
[Lappies:72828] [ 1] 0   libdyld.dylib                       0x00007fff57d99292 dyld_stub_binder + 282
[Lappies:72828] [ 2] 0   libmpi.40.dylib                     0x000000010be67339 ompi_comm_destruct + 32
[Lappies:72828] [ 3] 0   mca_pml_ob1.so                      0x000000010d2c5308 mca_pml_ob1_iprobe + 549
[Lappies:72828] [ 4] 0   libmpi.40.dylib                     0x000000010bea007f MPI_Iprobe + 284
[Lappies:72828] [ 5] 0   libboost_mpi-mt.dylib               0x000000010c005aae _ZNK5boost3mpi12communicator6iprobeEii + 62
[Lappies:72828] [ 6] [Lappies:72828] *** Process received signal ***
[Lappies:72828] Signal: Abort trap: 6 (6)
[Lappies:72828] Signal code:  (0)
[Lappies:72828] [ 0] 0   run_hi                              0x000000010b539155 _ZN9Secondary12recvMessagesEv + 69
0   libsystem_platform.dylib            0x00007fff580a7f5a _sigtramp + 26
[Lappies:72828] [ 1] [Lappies:72828] [ 7] 0   ???                                 0x000000000000ffff 0x0 + 65535
[Lappies:72828] [ 2] 0   run_hi                              0x000000010b526915 _ZN9Secondary3runEv + 53
[Lappies:72828] [ 8] 0   libsystem_c.dylib                   0x00007fff57e451ae abort + 127
[Lappies:72828] [ 3] 0   run_hi                              0x000000010b52ff43 _ZN7Primary9runWorkerEv + 35
[Lappies:72828] 0   libsystem_malloc.dylib              0x00007fff57f4ead4 szone_error + 596
[ 9] [Lappies:72828] [ 4] 0   run_hi                              0x000000010b532bb1 _ZNSt3__114__thread_proxyINS_5tupleIJNS_10unique_ptrINS_15__thread_structENS_14default_deleteIS3_EEEEPFvvEEEEEEPvSA_ + 497
[Lappies:72828] [10] 0   libsystem_malloc.dylib              0x00007fff57f44721 tiny_free_list_remove_ptr + 298
[Lappies:72828] [ 5] 0   libsystem_pthread.dylib             0x00007fff580b1661 _pthread_body + 340
[Lappies:72828] [11] 0   libsystem_pthread.dylib             0x00007fff580b150d _pthread_body + 0
[Lappies:72828] [12] 0   libsystem_pthread.dylib             0x00007fff580b0bf9 thread_start + 13
[Lappies:72828] *** End of error message ***
0   libsystem_malloc.dylib              0x00007fff57f59aca tiny_free_no_lock + 1450
[Lappies:72828] [ 6] 0   libsystem_malloc.dylib              0x00007fff57f5a256 free_tiny + 628
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 0 on node Lappies exited on     signal 11 (Segmentation fault: 11).
--------------------------------------------------------------------------

如您所见,它在 boost mpi iprobe 上崩溃了。这有时发生在 Secondary::recvMessages 中,有时发生在 Secondary::checkQuit 中,频率大致相同。

这个问题是我的代码吗?或者 Boost MPI 中是否存在错误,如果有,我该如何解决? 无论如何,我对如何解决这个问题感到困惑。

中学.hpp

namespace mpi = boost::mpi;

class Secondary {
    mpi::communicator world;

    volatile bool quit;
    std::list<WorkItem*> queue;
    std::list<mpi::request> sends;
    int rank;
    void resolveSends(){
        if (!sends.empty()){
            sends.remove_if([](mpi::request req){ return req.test(); });
        }
    }
public:
    explicit Secondary(int rank) : rank(rank), quit(false) {}

    void dowork(){
        /// take everything from the queue and do the work
        if (!queue.empty()){ /// Do work
            WorkItem* pwi = queue.front();
            queue.pop_front();
            std::list<WorkItem*> l = pwi->work();
            for (auto& i : l){
                queue.push_back(i);
            }
            ReturnResult rr = ReturnResult(rank, queue.size());
            std::cout << "    Secondary " << rank << "  workleft=" << queue.size()
                    << "  isend (" << pwi->getId() << " ) " << rr.workerid << " : " << rr.remaining <<std::endl;

            sends.push_back(world.isend(0, TagType::WORK_STATUS, rr.remaining));
            checkQuit(); /// make sure we respond to quits
            delete pwi;
        }
        resolveSends();
    }

    void recvMessages() {
        while (world.iprobe(0, TagType::WORK)) {
            std::vector<WorkItem*> pwi;
            world.recv(0, TagType::WORK, pwi);
            std::cout << "    Secondary " << world.rank() << "  found (" << pwi.size() << " ) " << std::endl;
            for (auto& i : pwi){
                queue.push_back(i);
            }
        }
    }

    bool checkQuit() {
        if (world.iprobe(0, TagType::QUIT)) {
            std::cout << "    Secondary " << world.rank() << "  found QUIT  " << std::endl;
            world.recv(0, TagType::QUIT);
            quit = true;
            return true;
        }
        return false;
    }

    void run() {
        /// for some reason going through this loop will often cause iprobe crashes
        while (!quit) {
            recvMessages(); /// check for more workitems

            dowork(); /// do the work

            if (checkQuit()){ /// check for quit
                resolveSends();
                break;}
            /// Yield is insufficient
            std::this_thread::yield();
        }
    }
};

初级.hpp

namespace mpi = boost::mpi;

class Primary {
    mpi::communicator world;

    JobHandler jh;
    std::list<mpi::request> sends;
    void resolveSends(){
        if (!sends.empty()) {
            sends.remove_if([](mpi::request req) { return req.test(); });
        }
    }
public:
    Primary() : jh(world.size()){}

    static void runWorker(){
        Secondary worker(0);
        worker.run();
    }

    void runJobs(){
        std::vector<int> workerIds(world.size());
        std::iota(workerIds.begin(), workerIds.end(), 0);
        const int nworkers = static_cast<int>(workerIds.size());

        std::thread workerThread = std::thread(runWorker);
        std::list<WorkItem*> allitems = jh.getAllItems();
        int sendto;
        while (true) {
            /// Send all of our items
            if (!allitems.empty()){
                std::vector<WorkItem*> v{ std::begin(allitems), std::end(allitems) };
                for (int i=0, sendto=1; i< nworkers;++i, ++sendto){
                    std::vector<WorkItem *> send = vecutil::split(v, nworkers, i);
                    std::cout << " >>> " << workerIds[sendto % nworkers] << " " << send.size() << " " << allitems.size() << std::endl;
                    sends.push_back(world.isend(workerIds[sendto % nworkers], TagType::WORK, send));
                    jh.sendingWorkTo(sendto, send.size());
                }
            }
            resolveSends();

            std::this_thread::yield();
            std::for_each(allitems.begin(), allitems.end(), vecutil::DeleteVector<WorkItem*>());
            allitems.clear();

            /// Check for done items
            for (auto& i : workerIds) {
                while (world.iprobe(i, TagType::WORK_STATUS)) {
                    int r;
                    mpi::request req = world.irecv(i, TagType::WORK_STATUS, r);
                    jh.workItemComplete(ReturnResult(i,r));
                    std::cout << jh << "                 <<< Secondary " <<i << "  remaining=" << r <<
                            "  complete=" << jh.isComplete() << std::endl;
                }
            }
            /// Check for complete
            if (jh.isComplete()){
                std::cout << "====\n >>> complete " << std::endl;
                break;
            }
            std::this_thread::yield();
        }

        std::cout << "======================  sending quit " << std::endl;
        mpi::request reqs[nworkers];
        int n=0;
        for (auto& i : workerIds) {
            reqs[n++] = world.isend(i, TagType::QUIT);
        }
        mpi::wait_all(reqs, reqs + nworkers);
        std::cout << "======================  gathering " << std::endl;
        workerThread.join();
        std::cout << "======================  quitting " << std::endl;
    }



    void addJob(Job* pjob) {
        jh.addJob(pjob);
    }
};

我可以提供任何需要的附加代码,但我认为这是相关部分。

最佳答案

您似乎从多个线程使用 MPI 库(Boost.MPI 的包装器)。在这种情况下,您需要相应地初始化 MPI 库。但是,您需要确保底层 MPI 库支持线程以及支持线程的级别。有 3 个级别,分别称为漏斗式、序列化多重。然后,您可以使用,例如:

mpi::environment env(argc, argv, mpi::threading::multiple);

但是,我无法在文档中找到如果 MPI 库在这种情况下不支持 MPI_THREAD_MULTIPLE 会发生什么情况。

关于c++ - 在 world.iprobe 上 boost MPI 崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50104589/

相关文章:

c# - 从内存中删除 flowdocument 对象

c - 尝试访问 C 中释放的内存时没有警告

c - 在 C 中取消分配结构的动态链表

C++ 智能指针性能

c++ - 寻找 boost::iterator_facade 的示例用法

c++ - 如何将 boost::python::iterator 与 return_internal_reference 一起使用?

c++ - 任何用于显示大位图的优秀 C++ 库

c++ - 编写一个从类型列表返回类型的元函数,该类型列表具有 C++11 中给定类型的 typedef

c++ - 用 boost::shared_ptr<std::list<T>> 初始化 boost::shared_ptr<std::vector<T>>

c++ - 如何以独立于平台的方式设置对齐方式?