c++ - ZeroMQ SUB 从不接收消息

标签 c++ zeromq

我在使用 ZeroMQ 中的 PUB/SUB 时遇到问题。

连接完所有内容后,发布者发布所有消息(套接字的发送消息返回 true )但是 SUB 永远不会收到它们并永远阻塞在 .recv() 函数。

这是我使用的代码:

void startPublisher()
{
    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB);
    zmq_socket.bind("tcp://127.0.0.1:58951");

    zmq::message_t msg(3);
    memcpy(msg.data(), "abc", 3);

    for(int i = 0; i < 10; i++)
        zmq_socket.send(msg); // <-- always true
}

void startSubscriber()
{
    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);

    zmq_socket.connect("tcp://127.0.0.1:58951");
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); // allow all messages

    zmq::message_t msg(3);
    zmq_socket.recv(&msg); // <-- blocks forever (message never received?)
}

请注意,我在两个不同的线程中运行这两个函数,首先启动 SUB 线程,等待一段时间,然后启动发布者线程(也尝试过其他方式,发布者在一个无限循环,但没有用)。

我在这里做错了什么?

最佳答案

根据您的示例,以下代码适用于我。 问题在于 PUB/SUB 模式是一个慢连接器,这意味着您需要在绑定(bind) PUB 套接字之后和发送任何消息之前等待一段时间。

#include <thread>
#include <zmq.hpp>
#include <iostream>
#include <unistd.h>
void startPublisher()
{
    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB);
    zmq_socket.bind("tcp://127.0.0.1:58951");
    usleep(100000); // Sending message too fast after connexion will result in dropped message
    zmq::message_t msg(3);
    for(int i = 0; i < 10; i++) {
        memcpy(msg.data(), "abc", 3);
        zmq_socket.send(msg); // <-- always true
        msg.rebuild(3);
        usleep(1); // Temporisation between message; not necessary
    }
}
volatile bool run = false;
void startSubscriber()
{
    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);
    zmq_socket.connect("tcp://127.0.0.1:58951");
    std::string TOPIC = "";
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // allow all messages
    zmq_socket.setsockopt(ZMQ_RCVTIMEO, 1000); // Timeout to get out of the while loop
    while(run) {
        zmq::message_t msg;
        int rc = zmq_socket.recv(&msg);  // Works fine
        if(rc) // Do no print trace when recv return from timeout
            std::cout << std::string(static_cast<char*>(msg.data()), msg.size()) << std::endl;
    }
}
int main() {
    run = true;
    std::thread t_sub(startSubscriber);
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
    std::thread t_pub(startPublisher);
    t_pub.join();
    sleep(1);
    run = false;
    t_sub.join();
}

关于c++ - ZeroMQ SUB 从不接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45842514/

相关文章:

c++ - friend struct继承类时无法访问

c++ - 如何在 Linux GPIO 中使用 boost::asio

python - 无法安装 PyZMP for Python -- 依赖项

c++ - cv::Mat 中的错误地址

c++ - Boost:反序列化通过 ZeroMQ pull socket 传递的自定义 C++ 对象

python - 在 zmq 订阅者中自动重新连接

java - IntelliJ 想法 : Show process id

c++ - [C++][std::sort] 它如何在 2D 容器上工作?

c++ - FLTK:覆盖 FL_box 构造函数。 C++

python - pyzmq 发布者可以从类实例进行操作吗?