c++ - ZeroMQ 推/拉

标签 c++ zeromq publish-subscribe

zmq 的某些部分未以可预测的方式运行。

我正在使用 VS2013 和 zmq 3.2.4。为了不在我的 pubsub 框架中“丢失”消息 [旁白:我认为这是一个设计缺陷。我应该能够首先启动我的订阅者,然后是发布者,我应该会收到所有消息] 我必须将发布者与订阅者同步到 la durapub/durasub等。我正在使用 zeromq 指南中的 durasub.cpp 和 durapub.cpp 示例。 如果我按原样使用示例,系统将完美运行。

如果我现在在 durasub.cpp 中的 ZMQ_PUSH 周围添加范围括号

{
    zmq::socket_t sync (context, ZMQ_PUSH);
    sync.connect(syncstr.c_str());
    s_send (sync, "sync");
}

系统停止工作。匹配的“ZMQ_PULL”信号永远不会到达 durapub.cpp 中的应用层。

我已经通过 C++ 包装器检查了 zmq_close 的返回值,一切正常。就 ZMQ 而言,它已将消息传递到端点。 希望我做了一些明显愚蠢的事情?

还有更多。添加

std::this_thread::sleep_for(std::chrono::milliseconds(1));

允许系统(即发布/订阅)重新开始工作。所以这显然是一个竞争条件,大概是在收割者线程中,因为它破坏了套接字。

更多的挖掘。我认为 LIBZMQ-179 也提到了这个问题。


EDIT#2 2014-08-13 03:00 [UTC+0000]

Publisher.cpp:

#include <zmq.hpp>
#include <zhelpers.hpp>
#include <string>
int main (int argc, char *argv[]) 
{
    zmq::context_t context(1);
    std::string bind_point("tcp://*:5555"); 
    std::string sync_bind("tcp://*:5554"); 
    zmq::socket_t sync(context, ZMQ_PULL);
    sync.bind(sync_bind.c_str());

    //  We send updates via this socket
    zmq::socket_t publisher(context, ZMQ_PUB);
    publisher.bind(bind_point.c_str());

    //  Wait for synchronization request
    std::string tmp = s_recv (sync);        
    std::cout << "Recieved: " << tmp << std::endl;          

    int numbytessent = s_send (publisher, "END");       
    std::cout << numbytessent << "bytes sent" << std::endl;
 }

订阅者.cpp

#include <zmq.hpp>
#include <zhelpers.hpp>
#include <string>

int main (int argc, char *argv[])
{   
    std::string connectstr("tcp://127.0.0.1:5555"); 
    std::string syncstr("tcp://127.0.0.1:5554");    

    zmq::context_t context(1);

    zmq::socket_t subscriber (context, ZMQ_SUB);
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    subscriber.connect(connectstr.c_str());

#if ENABLE_PROBLEM
    {
#endif ENABLE_PROBLEM
        zmq::socket_t sync (context, ZMQ_PUSH);
        sync.connect(syncstr.c_str());
        s_send (sync, "sync");
#if ENABLE_PROBLEM
    }
#endif ENABLE_PROBLEM

    while (1) 
    {
        std::cout << "Receiving..." << std::endl;
        std::string s = s_recv (subscriber);
        std::cout << s << std::endl; 
        if (s == "END")
        {
            break;
        }
    }  
}
  1. 将每个 cpp 编译成它自己的 exe。
  2. 启动两个 exe(启动顺序无关紧要)

如果定义了ENABLE_PROBLEM:

  • 发布者:(空提示)
  • 订阅者:“正在接收...” 然后您必须终止这两个进程,因为它们已挂起...

如果未定义ENABLE_PROBLEM:

  • 发布者:“收到:同步” “已发送 3 个字节”
  • 订阅者:“正在接收...” '结束'

最佳答案

EDIT#1 2014-08-11:原始帖子已更改,没有留下可见的修订

目标是什么?

恕我直言,仅从上述三个 SLOC 中分离目标并模拟任何通过/失败测试以验证目标是非常困难的。

那么让我们一步步开始吧。

那里使用了哪些 ZMQ 原语?

待定

编辑#1 后:ZMQ_PUSH + ZMQ_PULL + ( 隐藏ZMQ_PUB + ZMQ_SUB ... 下次宁愿发布 ProblemDOMAIN-context-complete 源,最好用 self-测试用例输出相似:

...
// <code>-debug-isolation-framing ------------------------------------------------
std::cout << "---[Pre-test]: sync.connect(syncstr.c_str()) argument" << std::endl;    
std::cout <<                              syncstr.c_str()            << std::endl;
std::cout << "---[Use/exec]: "                                       << std::endl;
sync.connect(                             syncstr.c_str());
// <code>-debug-isolation-framing ------------------------------------------------
...

)

部署了什么 ZMQ-context create/terminate life-cycle-policy?

待定

post-EDIT#1:n.b.:ZMQ_LINGER 相当影响资源的 .close(),这可能在 ZMQ_Context 之前发生终止出现。 (并且可能会阻塞......这会伤害......)

关于“ZMQ_LINGER 何时真正重要?”的注释

一旦 Context 即将终止,而发送队列尚未为空并且正在处理对 zmq_close() 的尝试,此参数就会生效.

在大多数体系结构中(......在低延迟/高性能中更多,其中微秒和纳秒计数......)(共享/受限)资源设置/处置操作出现的原因有很多 在系统生命周期的最开始,。不用多说为什么,想象一下与所有设置/丢弃操作直接相关的开销,这些开销在近乎真实的常规操作流程中根本不可能发生(重复发生的次数越少......)时间系统设计。

因此,让系统进程进入最后的“整理”阶段(就在退出之前)

设置 ZMQ_LINGER == 0 只是忽略仍在<sender>> 队列中的任何内容,并允许提示 zmq_close () + zmq_term()

类似地,ZMQ_LINGER == -1 将仍在<发件人> 队列中的任何内容放入[ 具有最大值(value)],整个系统必须无限期等待,在(希望任何)<接收者> 在允许任何 zmq_close() + zmq_term() 发生之前检索并“消费”所有排队的消息...这可能会很长并且完全不受您的控制...

最后,ZMQ_LINGER > 0 作为一种折衷方案,如果某些<接收器> 等待定义的 [msec]-s 数量来并检索排队的消息。然而,在给定的 TimeDOMAIN 里程碑上,系统会继续执行 zmq_close() + zmq_term() 以优雅干净地释放所有保留资源并根据系统设计时间退出约束条件。

关于c++ - ZeroMQ 推/拉,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25195242/

相关文章:

java - 为 jeromq 运行 mvn 包时出错 : "Bind Cannot assign requested address: connect"

javascript - strophe.js PEP 处理程序未正确附加

c++ - 不可变树的高效随机更新

c++ - LZ4:压缩图像格式的压缩

c++ - 问题解析字符串

c++ - 如何加载纹理Opengl?

c++ - zeromq pub sub 上丢失的消息

node.js - ZeroMQ 推/拉 : how to manage the queue when no pull clients?

Python:线程管理其他线程通知的事件

node.js - MQTT 到卡夫卡。如何避免重复