Python ZeroMQ PUSH/PULL逻辑,为低端puller设置高水位线而不丢失任何消息

标签 python zeromq distributed-system pyzmq low-latency

我使用简单的一对一PUSH/PULL工作/服务器Python代码来发送和接收消息。

工作线程使用PUSH套接字将消息发送到PULL服务器。服务器处理单元不如工作单元强大,因此当发送太多消息时,服务器的 RAM 开始增长,直到系统杀死所有内容。

我尝试按如下方式设置接收器高水位线:

worker_sock_in = ZMQ_CTXT.socket(zmq.PULL)
worker_sock_in.setsockopt(zmq.LINGER, 1000))
worker_sock_in.setsockopt(zmq.RCVTIMEO, 1000)) # detects if the link is broken
worker_sock_in.setsockopt(zmq.RCVHWM, 1000)
worker_sock_in_port = worker_sock_in.bind_to_random_port(listen_addr, port_start, port_end)

下面的代码用于工作人员创建和发送消息:

sock_dest = ZMQ_CTXT.socket(zmq.PUSH)
sock_dest.setsockopt(zmq.LINGER, 1000))
sock_dest.setsockopt(zmq.SNDTIMEO, 1000)) # detects if the link is broken
sock_dest.setsockopt(zmq.SNDHWM, 0) # never block on sending msg
sock_dest.connect(sock_dest_address)
# sends a msg
self.sock_dest.send(msg, zmq.NOBLOCK)

它似乎解决了问题,但我的猜测是溢出消息只是被服务器丢弃,这在我的情况下是 Not Acceptable 。

我的开发基于此 thread但我不确定是否理解答案的附加信息部分。

所以问题是,HWM 在 noblock 推/拉 Zeromq 套接字上达到的真正行为是什么,是否有一种方法可以拥有推拉基础设施,保证所有发送的消息都将被拉套接字接收,而不会增加其内存或阻止发件人?

最佳答案

Q : is there a way to have a push pull infrastructure that guaranties all sent messages will be received by the pull socket without inflating its memory or blocking the sender?

有办法吗?是的,有:

内置的零保修(涵盖消息要么作为原始消息的 1:1 位副本传递,要么根本不传递)将需要扩展 - 或者通过应用程序级协议(protocol)(涵盖重新传递) -发送那些未交付的,直到确认)或将您的基础设施转移到使用特定的有保证的交付协议(protocol),这将有助于满足高于标准的要求 - 用户使用 norm:// 传输级扩展并移动范例,在 PUSH/PULL 仍未处于 RTO 状态的情况下,转换为 PUB/SUB、XPUB/XSUB 可扩展的形式沟通模式原型(prototype)。

A new transport option is available in libzmq. The "norm_engine.hpp" and "norm_engine.cpp" files provide an implementation of a NACK-Oriented Reliable Multicast (NORM) transport protocol option for ZeroMQ. NORM is an IETF open standards protocol specified in RFC 5740 and supporting documents. The Naval Research Laboratory (NRL) provides an open source reference implementation that is hosted at http://www.nrl.navy.mil/itd/ncs/products/norm.

NORM supports reliable data delivery over IP multicast but also supports unicast (point-to-point) data transfers. NORM operates on top of the User Datagram Protocol (UDP) and supports reliability via a NACK-based Automated Repeat Request (ARQ) that uses packet erasure coding for very efficient group communication. NORM also provides for automated TCP-friendly congestion control and mechanisms for support end-to-end flow control. The NRL NORM implementation can also be configured to provide basic UDP-like best effort transport service (with no receiver feedback) and this can be enhanced by adding some amount application-settable proactive forward error correction (FEC) packets to the transmission. I.e., by default NORM only sends 'reactive' FEC repair packets in response to NACKs but can also be configured to proactively send added repair packets for a level of reliability without any feedback from the receivers. In addition to its TCP-friendly congestion control, NORM can also be configured for fixed-rate operation and the NRL implementation supports some additional automated congestion control options suitable for use in bit error prone wireless communication environments. While its reliable ARQ operation is principally NACK-based (negative acknowledgement when packet loss is detected), it also supports optional positive acknowledgment (ACK) from receivers that can be used for delivery confirmation and explicit flow control.

膨胀内存要求有两种方法:一是显式控制.send()-er,而不是泛滥.send()-er side Context()-实例的资源(RAM),即在受限资源限制内(主要防止任何泛洪/丢弃消息发生在所有),另一个 - 拥有足够的 RAM 和正确配置的 Context()-实例,以让所有数据流过。

<小时/>

Q : what are the real behavior of HWM reached on noblock push/pull zeromq sockets?

首先,让我们揭开这一点。 ZMQ_NOBLOCK 指令指向本地 .send()Context() 以立即返回对 .send( )-方法返回给调用者,即不阻止调用代码执行(消息有效负载被放置在本地 ZeroMQ Context()-实例中进行进一步处理,无论其状态如何内部状态 - 经典的非阻塞代码设计)。

相反,ZMQ_SNDHWM 指示 Context() 实例如何设置此套接字的阈值以及所述PUSH/PULL .send()-er情况:

he high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue in memory for any single peer that the specified socket is communicating with. A value of zero means no limit.

If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, ØMQ shall take appropriate action such as blocking or dropping sent messages. Refer to the individual socket descriptions in zmq_socket(3) for details on the exact action taken for each socket type.

ØMQ does not guarantee that the socket will accept as many as ZMQ_SNDHWM messages, and the actual limit may be as much as 60-70% lower depending on the flow of messages on the socket.

同时使用 ZMQ_XPUB_NODROP 可能有助于 norm://-transport-class 用例.

另请注意,默认情况下,ZMQ_PUSH-sockets 的 API 确认:

When a ZMQ_PUSH socket enters the mute state due to having reached the high water mark for all downstream nodes, or if there are no downstream nodes at all, then any zmq_send(3) operations on the socket shall block until the mute state ends or at least one downstream node becomes available for sending; messages are not discarded.

<小时/>

对于表现不佳的嫌疑人(PULL端),还可以使用.getsockopt(ZMQ_RCVBUF)在O/S端测试正确大小的设置-方法并根据需要使用适当的、足够大的 .setsockopt( ZMQ_RCVBUF ) 调整大小:

The ZMQ_RCVBUF option shall set the underlying kernel receive buffer size for the socket to the specified size in bytes. A value of -1 means leave the OS default unchanged. For details refer to your operating system documentation for the SO_RCVBUF socket option.

<小时/>

如果以上都没有帮助,人们可以使用 zmq_socket_monitor 服务将 self 诊断元平面注入(inject) ZeroMQ 基础设施,并获得对情况的完全控制,这通常发生在应用程序代码之外(根据需要反射(reflect)内部 API 状态和转换)。

决定权在你。

关于Python ZeroMQ PUSH/PULL逻辑,为低端puller设置高水位线而不丢失任何消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59456201/

相关文章:

c# - 如何将数据从 C# 推送到 ZeroMQ 并从 Node.JS 拉取数据,反之亦然?

python - ZMQ Pub 需要 while 循环发送消息

algorithm - Paxos是如何处理丢包和新节点加入的?

java - ZMQ 丢失事件在 jeromq scala 中传播

python - 为什么我看不到 Python 主线程中子进程中写入的队列?

python - 正确的Url生成 flask

python - findall() 函数中使用 beautiful soup 的 2+ 正则表达式参数

c++ - 带 ACE react 器的 ZeroMQ

python - 为什么 -2**2 == -4 但 math.pow(-2, 2) == 4.0?

c++ - zmq 在清理期间卡在 zmq_proxy()