c - ZMQ多部分冲洗器:是否可以通过ZeroMQ接收通过多部分消息接收的多部分总数,而无需全部读取?

标签 c networking zeromq distributed-system low-latency

我正在使用多部分消息传递在C中使用ZeroMQ实现一个简单的REQ-REP模式。我的大部分信息都有严格的4个部分(来回),只有少数例外。要强制执行此规则,我需要确定接收的多部分消息的部分总数。知道它是否<=4很容易。这是我的接收器功能:

#define BUFMAX  64 // Maximum size of text buffers
#define BUFRCV  63 // Maximum reception size of text buffers (reserve 1 space to add a terminal '\0')

char mpartstr[4][BUFMAX];


int recv_multi(void *socket,int *aremore)

// Receive upto the first 4 parts of a multipart message into mpartstr[][].
// Returns the number of parts read (upto 4) or <0 if there is an error.
// Returns -1 if there is an error with a zmq function.
// It sets aremore=1 if there are still more parts to read after the fourth
// part (or aremore=0 if not).
{
 int len,rc,rcvmore,pdx,wrongpard=0;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=0;
 len=zmq_recv(socket, mpartstr[pdx], BUFRCV, 0);
 if(len==-1) return len;

 mpartstr[pdx][len]='\0';
 rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1;

 pdx++;
 if(rcvmore==0){*aremore=0; return pdx;}

 while(rcvmore){
   len=zmq_recv (socket, mpartstr[pdx], BUFRCV, 0); if(len==-1) return len; mpartstr[pdx][len]='\0';
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx==4) break;
 }

 *aremore=rcvmore;
 return pdx;
}

一切都好但是现在在我的main()函数中,我通过查看aremore的值来检查是否有更多的部分。在我不期望更多的情况下,我将向发送者发送一条错误消息,但是我发现如果我不读取一条多部分消息的所有部分,ZeroMQ就不喜欢它(它在我下次调用zmq_recv()函数时读取这个旧的多部分消息的其余部分,即使在我发送了一条消息并期望得到一个新的干净的多部分响应之后)。
所以我真正需要的是一种“flush”函数来清除包含4个以上部分的消息的其余部分。到目前为止,我唯一要做的就是像这样一个丑陋的任意暴力耗尽函数(aremore的值从1开始-它是由前一个函数设置的):
int recv_exhaust(void *socket,int *aremore)

// Receive the remainder of a multipart message and discard the contents.
// Use this to clean out a multi-part 'inbox' from a wrongly sent message.
// Returns 0 on success
// Returns -1 on zmq function failure
// Returns -2 on failure to exhaust even after 1000 parts.
{                                                          
 int len,rc,rcvmore,pdx;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=1;
 rcvmore=*aremore;

 while(rcvmore){
   len=zmq_recv(socket, mpartstr[0], BUFRCV, 0); if(len==-1) return len;
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx>1000) return -2;
 }

 return 0;
}

如果没有专用的“flusher”API,那么至少我可以摆脱任意1000条消息的限制,如果我能够提前知道一条给定的多部分消息有多少部分(总共)的话。肯定ZeroMQ知道这一点,因为多部分消息作为一个整体块发送。有人能告诉我怎么找到这些信息吗?或者有合适的“flusher”函数/方法吗?(对于标准C请不要C++/C等)。提前谢谢。

最佳答案

问:有人能告诉我如何找到这些信息吗?
对。
问:有没有合适的“flusher”函数/方法?
是和否:
直到4.3.1版之前,ZeroMQv2.x还没有对“flusher”的显式API调用
ZeroMQ设计提供的低延迟智能消息传递的优点和功能是建立在精心设计的Zero理念之上的:总是喜欢性能而不是舒适——正如Zero-copy、Zero-warranty和其他范例所表明的那样。
天真(我承受了很多痛苦,把这件事琐碎到诉诸于使用一个原始的阻塞recv()…)”flusher“必须一直进行,直到ZMQ_RCVMORE不再标记多帧最后一条消息之外的任何部分(或zmq_msg_more() == 0不符合相同的要求)。尽管如此,所有这些操作都只是指针处理,没有数据从RAM中“移动/复制/读取”,只有指针被分配,因此它确实既快又高效:

int    more;
size_t more_size = sizeof ( more );
do {
      zmq_msg_t part;                       /* Create an empty ØMQ message to hold the message part */
      int rc = zmq_msg_init (&part);           assert (rc == 0 && "MSG_INIT failed" );
      rc = zmq_msg_recv (&part, socket, 0); /* Block until a message is available to be received from socket */
                                               assert (rc != -1 && "MSG_RECV failed" );
                                            /* Determine if more message parts are to follow */
      rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
                                               assert (rc == 0 && "GETSOCKOPT failed" );
      zmq_msg_close (&part);
      } while (more);

鉴于RFC-23/ZMTP记录的属性,只有少数(有线遥测编码)保证:
1)发送/传递所有消息:
原子的(要么是完全相同的所有帧的无错误二进制文件,要么完全没有)
最多一次(每个相关同行)
整齐
2)多部分信息额外获得内部(带内)遥测“通知”状态:
有点标记的状态{ 1: more-frames-follow| 0: no-more-frames }
有位标记的大小类型{ 0: 8b-direct-octet | 1: 64b-"network-endian"-coded }
尺寸建议{ 0~255: direct-size | 0~2^63-1: 64b-"network-endian"-coded-size }
记录的zmq_recv()API在这方面同样相当明确:
多部分消息
ØMQ消息由1个或多个消息部分组成。每个消息部分本身都是独立的zmq_msg_t。ØMQ确保消息的原子传递:对等方要么接收消息的所有消息部分,要么根本不接收任何消息部分。除可用内存外,消息部分的总数不受限制。
处理多部分消息的应用程序必须在调用ZMQ_msg_recv()后使用ZMQ_RCVMORE ZMQ_getsockopt(3)选项来确定是否还有其他部分要接收。
不管第一次读取时看起来有多难看,内存中最糟糕的情况是在一个由多个部分组成的消息帧中存储大量的小消息。
当然,最终“摆脱它们”的时间不是零,但是紧凑高效的内部ZMTP遥测和低延迟流处理的好处是更重要的目标(并且已经实现)。
如果有疑问,首先确定最坏情况:
a)“产生”约1E9多部分消息帧,传输零大小的有效载荷(没有数据,但所有消息帧)
b)“设置”尽可能简单的“拓扑”PUSH/PULL
c)“选择”您选择的传输类{ inproc:// | ipc:// | tipc:// | ... | vmci:// }-最佳无堆栈inproc://(我将以此开始压力测试)
d)秒表这种盲目的机械零快捷键“flusher”,介于aReferencePoint-S:和azmq_poll( ZMQ_POLLIN )之间(当ReferencePoint-E:已返回任何可读取内容时)以及a[S]之间(当来自多部分消息的最后一个部分被盲人“flusher”马戏团循环时)。
结果解释:
花费在[E]之间的纳秒,确实是将“替罪羊”变成一个明知故犯的“flusher”循环马戏团的最坏情况的证据。在现实世界的用例中,可能会有更多的时间来做同样的事情。
然而,不可忘记的是,在以超低延迟、高(几乎是线性的)可伸缩性为重点的消息/信令框架中,发送这种{明知如此大的{格式错误的}多帧野兽的责任是处理此问题的任何操作风险的根本原因。
正是零度禅宗的艺术,才使得这一切得以发生。多亏了皮耶特·辛特詹斯和他的团队,马丁·斯泰克带领的球队,我们都非常感谢他们能够在更进一步的工作中继承他们的传统。

关于c - ZMQ多部分冲洗器:是否可以通过ZeroMQ接收通过多部分消息接收的多部分总数,而无需全部读取?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58119387/

相关文章:

c - 如何用C语言编写GMM(高斯混合模型)?

docker - 为什么只有 `expose` 配置的服务能够与互联网通信?

asp.net - SQL Server 2005 网络 IO 等待时间(ASYNC_NETWORK_IO 等待类型)问题

python - 为什么我的 C++ ZeroMQ 订阅者收不到任何数据?

sockets - ZeroMQ:我们可以 .bind() 100+ 个套接字到同一个端点吗?

c - c中的多级继承

c - 64 位机器上的 32 位标准 C 库

c++ - 什么时候以及为什么要在代码中使用位域?

networking - 网站无法 ping 通,但可以通过 Web 浏览器打开

python - 多处理与 gevent