zeromq - zeromq 中的懒惰发布/订阅,仅获取最后一条消息

标签 zeromq lazy-evaluation datareader publisher subscriber

我正在尝试从示例 wuclient/wuserver 在 zeromq 上实现一个惰性订阅者。
客户端比服务器慢得多,因此它必须只获取服务器最后发送的消息。

到目前为止,我发现这样做的唯一方法是连接/断开客户端,但当然每个连接都会产生不必要的成本,大约 3 毫秒:

服务器.cxx

int main () {
    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");
    int counter = 0;
    while (1) {
        counter++;

        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
                  "%d", counter);
        publisher.send(message);
        std::cout     << counter <<  std::endl;
        usleep(100000);
      }
      return 0;
    }

客户端.cxx
int main (int argc, char *argv[])
{
  zmq::context_t context (1);
  zmq::socket_t subscriber (context, ZMQ_SUB);
  while(1){

    zmq::message_t update;
    int counter;

    subscriber.connect("tcp://localhost:5556"); // This call take some milliseconds
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 
    subscriber.recv(&update);
    subscriber.disconnect("tcp://localhost:5556");

    std::istringstream iss(static_cast<char*>(update.data()));
    iss >> counter;

    std::cout     << counter <<  std::endl;
    usleep(1000000);
  }
  return 0;
}

服务器输出:
1
2
3
4
5
6
7
8
9
...

客户端输出:
4
14
24
...

我试过在没有 co/deco 的情况下使用高水位标记来做到这一点,但它不起作用。
即使使用这种代码,只有当缓冲区达到至少数百条消息时,才会开始丢弃帧。 :
int high_water_mark = 1;
socket.setsockopt(ZMQ_RCVHWM, &high_water_mark, sizeof(high_water_mark) );
socket.setsockopt(ZMQ_SNDHWM, &high_water_mark, sizeof(high_water_mark) );

还有this post在密切相关的 zeromq-dev 中,但提供了解决方案(使用另一个线程来选择最后一条消息是 Not Acceptable ,我无法通过网络传输大量消息,之后将不再使用。

最佳答案

解决方法是使用ZMQ_CONFLATE像这样(仅适用于非多部分消息):

客户端.cxx

#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <unistd.h>

int main (int argc, char *argv[])
{
  zmq::context_t context (1);

  zmq::socket_t subscriber (context, ZMQ_SUB);

  int conflate = 1;
  subscriber.setsockopt(ZMQ_CONFLATE, &conflate, sizeof(conflate) );
  subscriber.connect("tcp://localhost:5556");
  subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 

  while(1){

    zmq::message_t update;
    int counter;

    subscriber.recv(&update);

    std::istringstream iss(static_cast<char*>(update.data()));
    iss >> counter;

    std::cout     << counter <<  std::endl;
    usleep(1000000);

  }
  return 0;
}

关于zeromq - zeromq 中的懒惰发布/订阅,仅获取最后一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26379365/

相关文章:

entity-framework - Entity Framework ,已经有一个与此连接关联的打开的 DataReader 必须先关闭

c++ - ZMQ 经销商 - 路由器通讯

Python zeromq——单个订阅者的多个发布者?

performance - Corosync、ZeroMQ 和 Spread 的 CPG 在消息传递方面如何比较?

c++ - 使用 C++1 1's ' auto' 会降低性能甚至破坏代码吗?

c# - 使用 C# 和存储过程从 SQL Server 检索 VarChar(MAX)

multithreading - ZeroMQ:如何处理 ZeroMQ 节点中与消息无关的异步事件?

haskell - 如何在 Haskell 中强制评估?

Haskell:使用 Curl 延迟下载

c# - 在 C# 中创建通用 MySQL SELECT 语句