c++ - ZeroMQ不同速度的订户看到相同的消息

标签 c++ zeromq

我在c++中使用zmq 2.2(我知道较旧的版本)来创建具有多个连接的订阅服务器的发布服务器,这些订阅服务器以不同的速度读取消息。根据我对文档的理解以及Peter Hintjens的答案here,每个订阅者都有自己的队列,而发布者每个连接的订阅者都有一个队列。这似乎表明每个订户都独立于其他订户从发布者接收消息。

但是,在快速订阅者和慢速订阅者下面的代码段中,它们会收到相似的消息或完全相同的消息(即使我增加了在 A 点的睡眠时间并更改了 B 点的 ZMQ_HWM ,也会发生这种情况)。

有人可以阐明为什么会这样吗?

#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <future>
using socket_t = zmq::socket_t;
using context_t = zmq::context_t;
using msg_t = zmq::message_t;
using namespace std;

vector<int> slow_consumer(int64_t hwm, int to_read)
{
    vector<int> v;
    context_t context{1};
    socket_t socket(context, ZMQ_SUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    socket.connect("tcp://localhost:5554");
    msg_t msg;
    sleep(3);  // 3 seconds
    for (int i = 0; i < to_read; i++)
    {
        socket.recv(&msg);
        usleep(10000);  // 10 miliseconds ___________________________POINT A
        v.emplace_back(*reinterpret_cast<int*>(msg.data()));
    }
    return v;
}
vector<int> fast_consumer(int64_t hwm, int to_read)
{
    vector<int> v;
    context_t context{1};
    socket_t socket(context, ZMQ_SUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    socket.connect("tcp://localhost:5554");
    msg_t msg;
    for (int i = 0; i < to_read; i++)
    {
        socket.recv(&msg);
        v.emplace_back(*reinterpret_cast<int*>(msg.data()));
    }
    return v;
}
void publisher(int64_t hwm)
{
    context_t context{1};
    socket_t socket(context, ZMQ_PUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.bind("tcp://*:5554");
    int count = 0;
    while (true) {
        msg_t msg(sizeof(count));
        memcpy(msg.data(), &count, sizeof(count));
        socket.send(msg);
        count++;
    }
}

int main() 
{
    int64_t hwm = 1;  // __________________________________________POINT B
    int to_read = 20;
    auto fast = async(launch::async, fast_consumer, hwm, to_read);
    auto slow = async(launch::async, slow_consumer, hwm, to_read);
    hwm = 1;  // Don't queue anything on the publisher
    thread pub(publisher, hwm);
    auto slow_v = slow.get();
    auto fast_v = fast.get();

    cout << "fast    slow" << endl;
    for (int i = 0; i < fast_v.size(); i ++)
    {
        cout << fast_v[i] << "   " << slow_v[i] << endl;
    }
    exit(0);
}

编译:g++ -o mixed mixed_speed_consumers.cpp -g -lzmq -lpthread通过
GCC 6.3

样本输出:
fast    slow
 25988   305855
 52522   454312
 79197   477807
106365   502594
132793   528551
159236   554519
184486   581419
209208   606411
234483   629298
256122   651159
281188   675031
305855   701533  // Messages on the fast subscriber starting here line up with messages on the slow subscriber
454312   727817
477807   754154
502594   778654
528551   804137
554519   830677
581419   854959
606411   878841
629298   902601

最佳答案

each subscriber has its own queue



是的,它确实 ...

这来自 PUB -side .Context() -instance的设计属性,在该属性中进行发送队列管理(稍后会对此进行更多介绍)。

您可以在[ ZeroMQ hierarchy in less than a five seconds ]部分中简短阅读有关主要概念性技巧的内容。

This would seem to indicate that each subscriber receives messages from the publisher independent of other subscribers.



是的,它确实 ...

各个“专用”队列之间没有交互。此处重要的是 ZMQ_HWM ,其作用是“阻止程序”语义的副作用。

在此设置中,简约的 ZMQ_HWM 可以保护/阻止任何新条目插入 PUB -侧“私有(private)”-发送队列(大小不超过 ZMQ_HWM == 1 的深度),直到成功进行远程操作-清空(由“远程” SUB -侧Context() -s自主异步的“内部”传输相关主动性,取决于是否可能(重新)加载该 SUB -侧“私有(private)”-接收队列(大小,再者,根据 ZMQ_HWM == 1 而言,没有更深的意思

换句话说, PUB.send() -s的有效载荷将被有效地丢弃,直到远程 *_SUB.recv() -s从其“远程”-Context() -instance的接收队列中卸载“阻塞”有效载荷(大小,为根据 ZMQ_HWM == 1 的规定,最多只能存储一个有效载荷。

以这种方式, PUB.send() -er 在( secret 阻止)测试期间接收了大约 ~ 902601 ()上的上的()发射的不仅仅是20消息

在调用SUB -method时,所有这些 == to_read 消息都只是在 902581+ -旁边被 PUB 扔掉了。

它实际上如何在内部工作? Context()内部的简化 View

给定上面的模拟示例,随着.send() -ed对等节点的出现和消失, Context() -管理的队列的增长/收缩,但是在ZeroMQ API v2.2中同时具有TX和RX端。相同的高水位线天花板。如所记录的,尝试对超出此限制的任何内容进行Context()的尝试将被丢弃。
TIME                   _____________________________
v                     [                             ]
v                     [                             ]
v                     [                             ]
v                     [                             ]
v                     PUB.setsockopt(  ZMQ_HWM, 1 );]
v                     PUB.send()-s     [        |   ]
v                        :             [        +-----------------QUEUE-length ( a storage depth ) is but one single message
v    _________________   :             [             
v   [                 ]  :             [Context()-managed pool-of-QUEUE(s)
v   [                 ]  :             [
v   [                 ]  :             [          ___________________
v   [                 ]  :             [         [                   ]
v   FAST_SUB.connect()---:------------>[?]       [                   ]
v   FAST_SUB.recv()-s    :             [?]       [                   ]
v           :            :             [?]       [                   ]
v           :            :             [?][?]<---SLOW_SUB.connect()  ]
v           :            :             [?][?]    SLOW_SUB.recv()-s   ]
v           :            .send(1)----->[1][1]            :
|       1 <-.recv()--------------------[?][1]            :
|           :                          [?][1]            :
|           :            .send(2)----->[2][1]            :
|       2 <-.recv()--------------------[?][1]            :
|           :                          [?][1]            :
|           :            .send(3)----->[3][1]            :
|       3 <-.recv()--------------------[?][?]------------.recv()-> 1
|           :                          [?][?]            :
|           :            .send(4)----->[4][4]            :
|       4 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(5)----->[5][4]            :
|       5 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(6)----->[6][4]            :
|       6 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(7)----->[7][4]            :
|       7 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(8)----->[8][4]            :
|       8 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(9)----->[9][4]            :
|       9 <-.recv()--------------------[?][?]------------.recv()-> 4
|           :                          [?][?]            :
|           :            .send(A)----->[A][A]            :
|       A <-.recv()--------------------[?][A]
|           :                          [?][A]
|           :            .send(B)----->[B][A]
|       B <-.recv()--------------------[?][A]
v           :                          [  [
v           :                          [
v           :
v

"Messages on the fast subscriber starting here line up with messages on the slow subscriber"



不,这不会发生。没有“排队”,而是持续时间的巧合,其中fast-.connect()尚未使它成为20x .send() -s,而在阻塞SUB之后,缓慢的(-ed)-.recv()最终得到了。

最初的“差距”只是SUB阶段的影响,其中较慢的 sleep( 3 ) 不会尝试接收任何内容
main(){
|  
| async(launch::async,fast|_fast____________|
| async(launch::async,slow|     .setsockopt |_slow____________|
| ...                     |     .setsockopt |     .setsockopt |
| ...                     |     .connect    |     .setsockopt |
| thread                  |      ~~~~~~?    |     .connect    |
| |_pub___________________|      ~~~~~~?    |      ~~~~~~?    |
| |    .setsockopt        |      ~~~~~~?    |      ~~~~~~?    |
| |    .bind              |      ~~~~~~?    |      ~~~~~~?    |
| |     ~~~~~~?           |      ~~~~~~?    |      ~~~~~~?    |
| |     ~~~~~~=RTO        |      ~~~~~~?    |      ~~~~~~?    |
| |    .send()-s  1,2,..99|      ~~~~~~?    |      ~~~~~~?    |
| |    .send()-s  23456,..|      ~~~~~~=RTO |      ~~~~~~=RTO |
| |    .send()-s  25988,..|  25988 --> v[ 0]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s  52522,..|  52522 --> v[ 1]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s  79197,..|  79197 --> v[ 2]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 106365,..| 106365 --> v[ 3]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 132793,..| 132793 --> v[ 4]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 159236,..| 159236 --> v[ 5]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 184486,..| 184486 --> v[ 6]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 209208,..| 209208 --> v[ 7]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 234483,..| 234483 --> v[ 8]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 256122,..| 256122 --> v[ 9]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 281188,..| 281188 --> v[10]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| |    .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| |    .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| |    .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| |    .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| |    .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| |    .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| |    .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| |    .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| |    .send()-s 651159,..|                 | 651159 --> v[ 9]|
| |    .send()-s 675031,..|     return v    | 675031 --> v[10]|
| |    .send()-s 701533,..|_________________| 701533 --> v[11]|
| |    .send()-s 727817,..|                 | 727817 --> v[12]|
| |    .send()-s 754154,..|                 | 754154 --> v[13]|
| |    .send()-s 778654,..|                 | 778654 --> v[14]|
| |    .send()-s 804137,..|                 | 804137 --> v[15]|
| |    .send()-s 830677,..|                 | 830677 --> v[16]|
| |    .send()-s 854959,..|                 | 854959 --> v[17]|
| |    .send()-s 878841,..|                 | 878841 --> v[18]|
| |    .send()-s 902601,..|                 | 902601 --> v[19]|
| |    .send()-s 912345,..|                 |                 |
| |    .send()-s 923456,..|                 |     return v    |
| |    .send()-s 934567,..|                 |_________________|
| |    .send()-s 945678,..|
| |    .send()-s 956789,..|
| |    .send()-s 967890,..|
| |    .send()-s 978901,..|
| |    .send()-s 989012,..|
| |    .send()-s 990123,..|
| |    .send()-s ad inf,..|                    

尽管 sleep( 3 ) -边代码强制性地尽可能快地调用SUB -s,但它是本地PUB -instance所保留的空间并不只是一个这样的消息要接受的,所有其他消息都被静默丢弃,无论何时进入队列独奏位置被占用。

每当 .send() 标记恢复为零时,内部机制便允许下一个其他Context()将消息的实际内容(有效负载)传递到队列存储,并且随后跟随HWM == 1 -s的所有后续尝试再次开始由于 .send() 绑定(bind)逻辑而被静默丢弃。

关于c++ - ZeroMQ不同速度的订户看到相同的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50047062/

相关文章:

c++ - Netbeans C++ 在本地主机上构建需要很长时间

c++ - 创建网络驱动程序

c++ - 为什么模板只能在头文件中实现?

C++ 循环不断评估 if 语句,使其速度太慢

node.js - 如果仅使用一次 socket.send(),则 zmq 订阅者无法订阅已发布的消息

c++ - 使用虚拟析构函数,我是否需要为每个子类显式声明一个虚拟析构函数?

c++ - 在 C++ 中读取通过 ZMQ 发送的 Flatbuffers 对象会引发未处理的异常

laravel - 使用 websockets 时的最佳实践?

node.js - NODE_MODULE_VERSION 46.这个版本的Node.js需要NODE_MODULE_VERSION 64.请尝试重新编译或重新安装

c++ - ZeroMQ 3.2.5 在 mailbox_t::recv 中的访问冲突