我正在为 ZeroMQ 使用 Python 绑定(bind)。我的 libzmq
版本是 4.2.5,我的 pyzmq
版本是 17.1.2。
我试图让“生产者”向“消费者”传输大量数据。 “生产者”的代码是:
# producer.py
import zmq
import time
import os
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind('tcp://*:8000')
x = os.urandom(1000000000) # large amount of data, requires much time to transmit it
sock.send(x)
print('Done')
sock.setsockopt(zmq.LINGER, 0)
sock.close()
t1 = time.time()
ctx.term() # I expect this should return immediately
print(time.time() - t1)
“消费者”的代码是:
# consumer.py
import zmq
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt_string(zmq.SUBSCRIBE, '')
sock.connect('tcp://localhost:8000')
data = sock.recv()
我希望 producer.py
中的 ctx.term()
应该立即返回,因为套接字的 LINGER
已经设置归零。但是当我运行这些代码时,ctx.term()
并没有按预期立即返回。相反,该函数需要数十秒才能返回,并且 consumer.py
已成功接收所有大数据。
我正在努力找出原因,希望有人能帮帮我。
最佳答案
Q : "ZeroMQ: set
LINGER=0
does not work as expected"
ZeroMQ 集 LINGER=0
恕我直言,是否按预期工作(如文档所述):
ZeroMQ 文档明确指出所有 zmq_setsockopt()
calls ( wrapped, for a use in python, into a method .setsockopt()
) 生效,即修改 Socket
-实例的行为。
旧版本的 ZeroMQ 文档(从 v2.x 开始在我的 ZeroMQ 包装的分布式系统项目中使用)对此更加明确:
Caution: All options, with the exception of
ZMQ_SUBSCRIBE
,ZMQ_UNSUBSCRIBE
,ZMQ_LINGER
,ZMQ_ROUTER_MANDATORY
andZMQ_XPUB_VERBOSE
only take effect for subsequent socketbind
/connects
.
考虑到这一点,sock.setsockopt( LINGER, 0 )
确实指示 Socket()
-实例 sock
不要等待相应的 <aContextINSTANCE>.term()
直到它完成所有尚未入队的消息的所有尝试都完全传播到队列头端并在那里处理到有线协议(protocol)并在其监督下成功发送或接受已在其网络范围内丢失通往邻近交易对手的方式。
然而,这并没有说明将对传输中的数据做什么,即 Context()
-实例已经在移动。
据我所知worked extensively自 v2.x 以来使用 ZeroMQ,恕我直言,没有任何东西提醒我如何使用向公共(public) API 公开的 ZeroMQ 语义中断正在进行的消息传输,超出 LINGER
-指示的行为,可以解释为:
“忽略任何已知的发送/接收仍在等待其进入队列”,
然而,这并不会阻止将传输中的数据发送到线路上的进程。
ZeroMQ 特意以这种方式工作。
人们可能想阅读更多关于 ZeroMQ 内部的信息 here或者也许只是从轨道高的角度看一个一般的观点,如 "ZeroMQ Hierarchy in less than a Five Seconds" .
结语:仅供不得已时使用
如果确实最终需要有某种方法来阻止甚至这些传输中的消息流,请随时发布一个关于如何让事情以这种疯狂方式工作的新问题。
关于python - ZeroMQ:设置 LINGER=0 没有按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57259551/