python - ZeroMQ:设置 LINGER=0 没有按预期工作

标签 python multithreading message-queue zeromq pyzmq

我正在为 ZeroMQ 使用 Python 绑定(bind)。我的 libzmq 版本是 4.2.5,我的 pyzmq 版本是 17.1.2。

我试图让“生产者”向“消费者”传输大量数据。 “生产者”的代码是:

# producer.py
import zmq
import time
import os

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind('tcp://*:8000')

x = os.urandom(1000000000) # large amount of data, requires much time to transmit it
sock.send(x)
print('Done')

sock.setsockopt(zmq.LINGER, 0)
sock.close()

t1 = time.time()
ctx.term() # I expect this should return immediately
print(time.time() - t1)

“消费者”的代码是:

# consumer.py
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)

sock.setsockopt_string(zmq.SUBSCRIBE, '')
sock.connect('tcp://localhost:8000')

data = sock.recv()

我希望 producer.py 中的 ctx.term() 应该立即返回,因为套接字的 LINGER 已经设置归零。但是当我运行这些代码时,ctx.term() 并没有按预期立即返回。相反,该函数需要数十秒才能返回,并且 consumer.py 已成功接收所有大数据。

我正在努力找出原因,希望有人能帮帮我。

最佳答案

Q : "ZeroMQ: set LINGER=0 does not work as expected"

ZeroMQ 集 LINGER=0恕我直言,是否按预期工作(如文档所述):

ZeroMQ 文档明确指出所有 zmq_setsockopt() calls ( wrapped, for a use in python, into a method .setsockopt() ) 生效,即修改 Socket -实例的行为。

旧版本的 ZeroMQ 文档(从 v2.x 开始在我的 ZeroMQ 包装的分布式系统项目中使用)对此更加明确:

Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, ZMQ_LINGER, ZMQ_ROUTER_MANDATORY and ZMQ_XPUB_VERBOSE only take effect for subsequent socket bind/connects.

考虑到这一点,sock.setsockopt( LINGER, 0 )确实指示 Socket() -实例 sock 不要等待相应的 <aContextINSTANCE>.term()直到它完成所有尚未入队的消息的所有尝试都完全传播到队列头端并在那里处理到有线协议(protocol)并在其监督下成功发送或接受已在其网络范围内丢失通往邻近交易对手的方式。

然而,这并没有说明将对传输中的数据做什么,即 Context() -实例已经在移动。

据我所知worked extensively自 v2.x 以来使用 ZeroMQ,恕我直言,没有任何东西提醒我如何使用向公共(public) API 公开的 ZeroMQ 语义中断正在进行的消息传输,超出 LINGER -指示的行为,可以解释为:
“忽略任何已知的发送/接收仍在等待其进入队列”
然而,这并不会阻止将传输中的数据发送到线路上的进程。

ZeroMQ 特意以这种方式工作。

人们可能想阅读更多关于 ZeroMQ 内部的信息 here或者也许只是从轨道高的角度看一个一般的观点,如 "ZeroMQ Hierarchy in less than a Five Seconds" .


结语:仅供不得已时使用

如果确实最终需要有某种方法来阻止甚至这些传输中的消息流,请随时发布一个关于如何让事情以这种疯狂方式工作的新问题。

关于python - ZeroMQ:设置 LINGER=0 没有按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57259551/

相关文章:

python - 从 bash 命令行获取未扩展的参数

python - Airflow - 彩色记录

python - 带有 python 和 flask 的 RESTful API 请求和读取文本文件

python - 如何将包含键的列表和另一个列表作为值写入 Python 中每列的 CSV 文件

multithreading - 在 Actor 内部调用 Thread.sleep

node.js - 带有 Redis 消息队列的 NodeJS - 如何设置多个消费者(线程)

message-queue - ZeroMq 中发布-订阅和推拉模式的区别

从消息队列接收后无法使用shm_open

c - 如何优雅地让一个阻塞的pthread从主线程退出?

python - 如何在 python 中的线程之间传递异常