c++ - 使用进程内套接字的 ZeroMQ PubSub 永远挂起

标签 c++ multithreading zeromq communication

我正在调整一个 tcp PubSub 示例以将 inproc 与多线程一起使用。它最终会永远挂起。

我的设置

  • macOS Mojave,Xcode 10.3
  • zmq 4.3.2

重现问题的源代码:

#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <thread>
#include "zmq.h"

void hello_pubsub_inproc() {
    void* context = zmq_ctx_new();
    void* publisher = zmq_socket(context, ZMQ_PUB);
    printf("Starting server...\n");
    int pub_conn = zmq_bind(publisher, "inproc://*:4040");

    void* subscriber = zmq_socket(context, ZMQ_SUB);
    printf("Collecting stock information from the server.\n");
    int sub_conn = zmq_connect(subscriber, "inproc://localhost:4040");
    sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);

    std::thread t_pub = std::thread([&]{
        const char* companies[2] = {"Company1", "Company2"};
        int count = 0;
        for(;;) {
            int which_company = count % 2;
            int index = (int)strlen(companies[0]);
            char update[12];
            snprintf(update, sizeof update, "%s",
                     companies[which_company]);
            zmq_msg_t message;
            zmq_msg_init_size(&message, index);
            memcpy(zmq_msg_data(&message), update, index);
            zmq_msg_send(&message, publisher, 0);
            zmq_msg_close(&message);
            count++;
        }
    });

    std::thread t_sub = std::thread([&]{
        int i;
        for(i = 0; i < 10; i++) {
            zmq_msg_t reply;
            zmq_msg_init(&reply);
            zmq_msg_recv(&reply, subscriber, 0);
            int length = (int)zmq_msg_size(&reply);
            char* value = (char*)malloc(length);
            memcpy(value, zmq_msg_data(&reply), length);
            zmq_msg_close(&reply);
            printf("%s\n", value);
            free(value);
        }
    });

    t_pub.join();

    // Give publisher time to set up.
    sleep(1);

    t_sub.join();

    zmq_close(subscriber);
    zmq_close(publisher);
    zmq_ctx_destroy(context);
}

int main (int argc, char const *argv[]) {
    hello_pubsub_inproc();
    return 0;
}

结果

Starting server...
Collecting stock information from the server.

我也试过在加入线程之前添加这个无济于事:

zmq_proxy(publisher, subscriber, NULL);

解决方法:将inproc 替换为tcp 可立即修复它。但是 inproc 不应该针对进程内用例吗?

快速研究告诉我,这不可能是 bindconnect 的顺序,因为该问题已在我的 zmq 版本中修复。

下面的示例以某种方式告诉我我没有缺少共享上下文问题,因为它没有使用:

ZeroMQ Subscribers not receiving message from Publisher over an inproc: transport class

我从指南中的 Signaling Between Threads (PAIR Sockets) 部分读到

You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not distribute as PUSH or DEALER do. However, you need to configure the subscriber with an empty subscription, which is annoying.

空订阅是什么意思?

我哪里做错了?

最佳答案

You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not distribute as PUSH or DEALER do. However, you need to configure the subscriber with an empty subscription, which is annoying.


Q : What does it mean by an empty subscription?

这意味着设置(配置)订阅,驱动主题列表消息传递过滤,使用空订阅字符串。

Q : Where am I doing wrong?

这里:

// sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);   // Wrong
   sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "",0);  //  Empty string

这里也有疑问,关于使用正确的语法和命名规则:

// int pub_conn = zmq_bind(publisher, "inproc://*:4040");
   int pub_conn = zmq_bind(publisher, "inproc://<aStringWithNameMax256Chars>");

作为 inproc:// transport-class 不使用任何类型的外部堆栈,而是将 AccessPoint 的 I/O(s) 映射到 1+ 个内存位置(一个不需要 transport-class 的无堆栈 I/O 线程)。

鉴于此,没有像“<address>:<port#>”这样的(此处缺失的)协议(protocol)被解释,因此类似字符串的文本被原样用于识别消息数据将去往哪个内存位置进入。

因此,“ inproc://*:4040 ”不会展开,而是“按字面意思”用作命名的 inproc://传输类 I/O 内存位置标识为 [*:4040] (接下来,询问 .connect() - .connect( "inproc://localhost:4040" ) 的方法将而且必须这样做,在词法上会错过准备好的内存位置: ["*:4040"] 因为字符串不匹配

所以这应该不会.connect() - 错误处理可能是沉默的,因为从版本 +4.x 开始就没有必要首先遵守历史要求 .bind() (为 inproc:// 创建一个“已知的”命名内存位置),然后才能调用 .connect()让它与“已经存在的”命名内存位置交叉连接,因此 v4.0+ 很可能不会在调用和创建不同的 .bind( "inproc://*:4040" ) 时引发任何错误。 landing-zone 和 next asking a non-matching .connect( "inproc://localhost:4040" ) (在已经存在的命名内存位置中没有“先前准备好的”着陆区。

关于c++ - 使用进程内套接字的 ZeroMQ PubSub 永远挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57933312/

相关文章:

java - import org.zeromq 无法解析,我该怎么办?

c++ - 为什么这个 '1234' 的代码在 C++ 中编译?

c++ - 从另一个取消 boost 线程

php - websocket报错413,如何处理?

java - Selector.select 没有按预期阻塞

C++ 每 x 秒调用一个函数

c# - 为什么 Mono 上的 NetMQ DealerSocket 不向 Debian Wheezy 上的服务器发送消息,但在 Windows 上却发送消息?

c++ - 比较两个map::iterators:为什么需要std::pair的拷贝构造函数?

c++ - 如何在 std::map 中使用结构作为键?

c++ - 这种递减的顺序是否调用了未定义的行为?