c++ - 官方ZeroMQ多线程示例的修改版本崩溃

标签 c++ multithreading zeromq

我是zmq和cppzmq的新手。尝试运行官方指南中的多线程示例时:http://zguide.zeromq.org/cpp:mtserver

我的设定


macOS Mojave,Xcode 10.3
通过自制软件libzmq 4.3.2
cppzmq GitHub头


我遇到了一些问题。

问题1

在指南中运行源代码时,它将永久挂起,而不会显示任何标准输出。

这是直接从指南中复制的代码。

/*
    Multithreaded Hello World server in C
*/

#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <zmq.hpp>

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        zmq::message_t request;
        socket.recv (&request);
        std::cout << "Received request: [" << (char*) request.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        socket.send (reply);
    }
    return (NULL);
}

int main ()
{
    //  Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);
    clients.bind ("tcp://*:5555");
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    //  Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr != 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, (void *) &context);
    }
    //  Connect work threads to client threads via a queue
    zmq::proxy (static_cast<void*>(clients),
                static_cast<void*>(workers),
                nullptr);
    return 0;
}


我在worker的while循环中放置一个断点后,它很快崩溃了。

问题2

注意到编译器提示我替换不推荐使用的API调用,所以我修改了上面的示例代码以使警告消失。

/*
 Multithreaded Hello World server in C
 */

#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <cstdio>
#include <zmq.hpp>

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        std::array<char, 1024> buf{'\0'};
        zmq::mutable_buffer request(buf.data(), buf.size());
        socket.recv(request, zmq::recv_flags::dontwait);
        std::cout << "Received request: [" << (char*) request.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        try {
            socket.send (reply, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t& e) {
            printf("ERROR: %X\n", e.num());
        }
    }
    return (NULL);
}

int main ()
{
    //  Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);
    clients.bind ("tcp://*:5555");  // who i talk to.
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    //  Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr != 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, (void *) &context);
    }
    //  Connect work threads to client threads via a queue
    zmq::proxy (clients, workers);
    return 0;
}



我不假装对原始的损坏示例进行字面翻译,但是我是在努力使事情进行编译和运行而没有明显的内存错误。

这段代码不断为我提供来自try-catch块的错误编号9523DFB(十六进制的156384763)。我在官方文档中找不到错误号的定义,但是从this question那里得到了错误号的定义,它是本机ZeroMQ错误EFSM:

The zmq_send() operation cannot be performed on this socket at the moment due to the socket not being in the appropriate state. This error may occur with socket types that switch between several states, such as ZMQ_REP.


如果有人能指出我做错了什么,我将不胜感激。

更新

我尝试根据@ user3666197的建议进行轮询。但是程序仍然挂起。插入任何断点都会使程序崩溃,从而难以调试。

这是新的工作人员代码

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    zmq::pollitem_t items[1] = { { socket, 0, ZMQ_POLLIN, 0 } };

    while (true) {
        if(zmq::poll(items, 1, -1) < 1) {
            printf("Terminating worker\n");
            break;
        }

        //  Wait for next request from client
        std::array<char, 1024> buf{'\0'};
        socket.recv(zmq::buffer(buf), zmq::recv_flags::none);
        std::cout << "Received request: [" << (char*) buf.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        try {
            socket.send (reply, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t& e) {
            printf("ERROR: %s\n", e.what());
        }
    }
    return (NULL);
}

最佳答案

欢迎来到零禅的领域

怀疑1:由于进入分布式有限状态自动机的错误定向状态,代码直接跳入了无法解决的活动锁:

自从我提倡不阻塞。recv() -s以来,上面的代码只是通过使用以下步骤来自杀:

socket.recv( request, zmq::recv_flags::dontwait ); // socket being == ZMQ_REP

杀死了其他任何未来生活的机会,但非常错误的The zmq_send() operation cannot be performed on this socket at the moment due to the socket not being in the appropriate state.

仅当先前的.send()版本已传递真实消息时,才可能进入.recv()状态。



最佳下一步:

查看代码,在进入.recv()之前可以使用.send()的阻止形式,或者最好使用{尝试进入.poll( { 0 | timeout }, ZMQ_POLLIN )并继续做其他事情之前,先尝试使用.recv()的非阻塞}形式,如果还没有收到任何东西(为避免自杀式自杀使dFSA陷入可解决的冲突中, stdout/stderr,第二个间隔为printf(" ERROR: %X\n", e.num() );



错误处理:

更好地使用const char *zmq_strerror ( int errnum );int zmq_errno (void);



问题1:

与问题2根本原因中的自杀::dontwait标志相反,问题2根本原因是,此处第一个.recv()的阻塞形式将所有工作线程移动到不确定的长且可能无限长的状态,等待状态,因为.recv()块继续进行任何操作,直到到达真实消息为止(从MCVE看来,它永远不会出现),因此您的线程池仍在整个池范围内阻塞的等待状态,直到任何消息到达,什么都不会发生。



更新REQ/REP的工作方式:

REQ/REP可扩展通信模式原型的工作方式就像一对分散的人一样-一个,让我们称她的玛丽,问(Mary .send() -s REQ),而另一个,比如说REP的鲍勃在听可能无限期地长时间阻塞.recv()(或采取适当的措施,使用.poll()有序地并定期检查,如果Mary问过一些问题,然后继续做自己的业余爱好或园艺,则),而鲍勃的结局消息后,鲍勃可以去.send()给玛丽回覆(不是以前,因为他不知道玛丽在何时以及不久的将来会(或不会)问什么))并且玛丽很公平,不问她下一个< cc>向Bob提出问题,但要在Bob回复(REQ.send())并且Mary收到Bob的信息(REP.send())之后-这是公平且更加对称的,这比真实生活在一个屋檐下的真实人物所展现出的公平: o)

代码?

该代码不是可复制的MCVE。 REQ.recv()创建了五个Bobs(挂起等待来自Mary的呼叫,在main()传输级别上的某个地方),但是Mary从来没有打过电话,还是她吗?玛丽尝试这样做的迹象不明显,她(他们可能是一个N:M玛丽成群:5成群的关系)的(甚至是一个动态的)社区) s)处理来自5个鲍勃之一的REP-ly。

坚持不懈,ZeroMQ花了我一些时间抓挠自己的头,但是在我认真地学习零禅之后的几年里,这仍然是天堂花园永恒的有益尝试。没有localhost串行代码IDE将永远无法“调试”分布式系统(除非已建立分布式检查器基础结构,分布式系统监视器/跟踪器/调试器的适当体系结构是分布式消息传递/信号传输层的另一层)在调试的分布式消息传递/信号系统的顶部-因此不要期望琐碎的localhost串行代码IDE会提供它。

如果仍然有疑问,请找出潜在的麻烦制造者-将inproc://替换为inproc://,并且如果玩具不能与tcp://配合使用(可以通过有线方式跟踪消息),则不会使用tcp://内存区域技巧。

关于c++ - 官方ZeroMQ多线程示例的修改版本崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57920141/

相关文章:

c++ - 作业/任务消耗队列,自删除项目的有效案例?

c++ - 在cpp中调整 vector 大小

multithreading - 是否可以使用线程转储找到线程的创建时间?

java - 使用线程实时更新 JTextArea

c# - 为什么 Mono 上的 NetMQ DealerSocket 不向 Debian Wheezy 上的服务器发送消息,但在 Windows 上却发送消息?

c++ - crypto_box_easy 和 crypto_box_open_easy 的奇怪行为。不用私钥解密?

c++ - 使用类成员作为 WNDPROC/DLGPROC 有或没有全局

java - 从 ByteBuffer/Netty ByteBuff 并行/多线程读取

c# - 主从通信的IPC机制

go - 基于nanomsg的内部服务器