python - 单线程的 ZeroMQ 进程间通信丢失消息

标签 python unit-testing zeromq pyzmq

我目前正在探索测试我的 Zeromq 应用程序的可能性。我的印象是,我可以在同一个线程中拥有一个发布者/订阅者,让发布者发布而订阅者订阅它而不会丢失消息。然而,当我让发布者发送几条消息时,没有一条消息到达订阅者。

这是我使用的代码:

import zmq

def main():
    ctx = zmq.Context.instance()
    sender = ctx.socket(zmq.PUB)
    sender.setsockopt(zmq.HWM, 1000)
    sender.bind('tcp://*:10001')

    rcvr = ctx.socket(zmq.SUB)
    rcvr.setsockopt(zmq.HWM, 1000)
    rcvr.connect('tcp://127.0.0.1:10001')
    rcvr.setsockopt(zmq.SUBSCRIBE, "")

    for i in range(100):
        sender.send('%i' % i)

    while True:
        try:
            print rcvr.recv(zmq.NOBLOCK)
        except zmq.ZMQError:
            break


if __name__ == '__main__':
    main()

运行此程序时,我没有得到任何输出。

令我印象深刻的是,接收者在发送者发送之前就已连接,因此应该对这些消息进行排队。或者这是一个完全错误的假设,我应该使用 PUSH/PULL 来代替?

最佳答案

我认为这是慢加入问题的一个案例,如 ZeroMQ guide 中所述。 .

This "slow joiner" symptom hits enough people often enough that we're going to explain it in detail.

我认为主要问题是在订户套接字开始监听之前所有消息都已发送,并且消息飞过并被丢弃。在设置套接字和发送消息之间设置延迟是行不通的,因为最后一条消息在接收方开始监听之前已发送。

正如您所建议的,推/拉套接字会在内存中对作业进行排队。您可以像这样在单个进程中的套接字之间发送作业

# pushpull.py
import zmq

def main():
    ctx = zmq.Context()
    sender = ctx.socket(zmq.PUSH)
    sender.bind('tcp://*:10001')

    rcvr = ctx.socket(zmq.PULL)
    rcvr.connect('tcp://127.0.0.1:10001')

    for i in range(100):
        sender.send_unicode('%i' % i)

    while True:
        msg = rcvr.recv()
        print(msg)

if __name__ == '__main__':
    main()

或者,如果您想使用 pub/sub 套接字,我们需要两个进程以及套接字设置和消息发送之间的 time.sleep(1):

首先启动接收器

# rcvr.py
import zmq

def main():
    ctx = zmq.Context()
    rcvr = ctx.socket(zmq.SUB)
    rcvr.connect('tcp://127.0.0.1:10001')
    rcvr.setsockopt_string(zmq.SUBSCRIBE, "")

    while True:
        msg = rcvr.recv()
        print(msg)

if __name__ == '__main__':
    main()

然后是发件人,

# sender.py
import zmq
import time

def main():
    ctx = zmq.Context()
    sender = ctx.socket(zmq.PUB)
    sender.bind('tcp://*:10001')

    time.sleep(1)
    for i in range(100):
        sender.send_unicode('%i' % i)

if __name__ == "__main__":
    main()

接收:

b'0'
b'1'
b'2'
b'3' ...

我现在正在 Python 3.3 和 pyzmq 13.1.0 中工作,有很棒的 WinPython分布,因此 zmq 调用中的一些字符串处理有点不同,打印函数也是如此。 希望对您有所帮助。

关于python - 单线程的 ZeroMQ 进程间通信丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15945915/

相关文章:

python - 将 numpy 数组与 scipy odeint 一起使用

用于从 Linux 机器上的网络摄像头拍照的 Python api

php - 尝试在我的 Drupal 项目上运行 PhpUnit 测试时出现此错误

java - eclipse 第谷 : Testing plug-in's without using single test-bundles

c# - 为什么在使用 java 和 protobuf-net 的 Protocol Buffer 进行序列化时 byte[] 不同?

python - 如何匹配此模式和字符串中的最后一个空格?

python - 将文件转换为字符串,然后重建该字符串并再次使其成为文件

unit-testing - 在没有 Composer 的情况下使用 phpunit

windows - 如何在 Windows 上安装 zeromq4-haskell? (找不到 zmq 依赖项)

javascript - ZeroMQ (ZMQ) 在客户端提供 VivaGraph 图