c++ - 跨多个程序实例的 ZeroMQ IPC

标签 c++ tcp ipc zeromq distributed-computing

我在程序的几个实例之间的 ZMQ 中的进程间通信存在一些问题

  • 我正在使用 Linux 操作系统
  • 我正在为 libzmq
  • 使用 zeromq/cppzmq,仅 header C++ 绑定(bind)

    如果我运行此应用程序的两个实例(例如在终端上),我为一个实例提供一个作为监听器的参数,然后为另一个提供一个作为发送者的参数。监听器永远不会收到消息。我试过 TCP 和 IPC 无济于事。
    #include <zmq.hpp>
    #include <string>
    #include <iostream>
    
    int ListenMessage();
    int SendMessage(std::string str);
    
    zmq::context_t global_zmq_context(1);
    
    int main(int argc, char* argv[] ) {
        std::string str = "Hello World";
        if (atoi(argv[1]) == 0) ListenMessage();
        else SendMessage(str);
    
        zmq_ctx_destroy(& global_zmq_context);
        return 0;
    }
    
    
    int SendMessage(std::string str) {
        assert(global_zmq_context);
        std::cout << "Sending \n";
        zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
        assert(publisher);
    
        int linger = 0;
        int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
        assert(rc==0);
    
        rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
        if (rc == -1) {
            printf ("E: connect failed: %s\n", strerror (errno));
            return -1;
        }
    
        zmq::message_t message(static_cast<const void*> (str.data()), str.size());
        rc = publisher.send(message);
        if (rc == -1) {
            printf ("E: send failed: %s\n", strerror (errno));
            return -1;
        }
        return 0;
    }
    
    int ListenMessage() {
        assert(global_zmq_context);
        std::cout << "Listening \n";
        zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
        assert(subscriber);
    
        int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
        assert(rc==0);
    
        int linger = 0;
        rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
        assert(rc==0);
    
        rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
        if (rc == -1) {
            printf ("E: bind failed: %s\n", strerror (errno));
            return -1;
        }
    
        std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
        while (true) {
            zmq::message_t rx_msg;
            // when timeout (the third argument here) is -1,
            // then block until ready to receive
            std::cout << "Still Listening before poll \n";
            zmq::poll(p.data(), 1, -1);
            std::cout << "Found an item \n"; // not reaching
            if (p[0].revents & ZMQ_POLLIN) {
                // received something on the first (only) socket
                subscriber.recv(&rx_msg);
                std::string rx_str;
                rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
                std::cout << "Received: " << rx_str << std::endl;
            }
        }
        return 0;
    }
    

    如果我使用两个线程运行程序的一个实例,则此代码将起作用
        std::thread t_sub(ListenMessage);
        sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
        std::thread t_pub(SendMessage str);
        t_pub.join();
        t_sub.join();
    

    但我想知道为什么在运行程序的两个实例时上面的代码不起作用?

    谢谢你的帮助!

    最佳答案

    如果您从未使用过 ZeroMQ,不妨先看看 "ZeroMQ Principles in less than Five Seconds"在深入了解更多细节之前

    Q : wondering why when running two instances of the program the code above won't work?



    这个代码永远不会飞 - 它与 无关thread - 基于也不是 process 基于[CONCURENT]加工。

    它是由 的错误设计引起的。我 输入 电话 进程 中号沟通。

    ZeroMQ 可以为此提供任一支持的传输类:{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// }再加上一个更智能的进程内通信,inproc://传输类为线程间通信做好了准备,其中无堆栈通信可能享有最低的延迟,这只是一个内存映射策略。

    选择基于 L3/L2 的网络堆栈我 输入- 电话 进程- 中号通信是可能的,但最“昂贵”的选择。

    核心错误:

    鉴于这种选择,任何单个进程(不是说一对进程)会碰撞尝试 .bind() 它的接入点 完全相同 TCP/IP- address:port#

    另一个错误:

    即使是为了启动一个单独的程序,两个生成的线程都试图 .bind() 它的私有(private)接入点,但没有人尝试 .connect()匹配的“对面”接入点。

    至少要成功 .bind() , 和
    至少一个必须成功 .connect() ,从而得到一个“ channel ”,这里的 PUB/SUB 原型(prototype)。

    去做:
  • 决定一个合适的,足够的运输级 (最好避免为 localhost/in-process IPC 操作完整的 L3/L2 堆栈而过度杀伤)
  • 重构 Address:port# 管理(用于 2+ 进程不会在 .bind() 上失败 - 相同(硬连线)address:port#
  • 始终检测并适当处理返回的 {PASS|FAIL} -s 来自 API 调用
  • 始终设置 LINGER 明确归零(你永远不知道)
  • 关于c++ - 跨多个程序实例的 ZeroMQ IPC,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60120971/

    相关文章:

    c++ - 没有继承的友元 - 有什么选择?

    c++ - c++17 中的 STL 容器是否有三向比较

    c++ - 在 C++ 中编程快速排序时出现段错误

    tcp - 现实生活中TCP和UDP的例子有哪些?

    c++ - 使用 Boost::interprocess 在共享内存中映射 <int, void*>

    c++ - 如果尝试使用功能,则会收到警告 C6385

    ssl - 将 SSL 添加到微芯片通用 TCP 服务器应用程序

    go - 调用 tcp : lookup ip-x-x-xx. ec2.internal: 没有这样的主机

    c - 管道 "bad address"管道打开

    linux - 系统 V IPC msgrcv 与计时器 Howto