python - 为什么 asyncio.Queue 无法按预期工作?

标签 python zeromq pyzmq python-asyncio

我正在编写简单的生产者/消费者程序。

import zmq

@asyncio.coroutine
def receive_data(future,s):
        print("begin to recv sth from.....socket"
        my_data = s.recv()
        future.set_result(my_data)

@asyncio.coroutine
def producer(loop,q,s):
        while True:
                future = asyncio.Future()
                yield from receive_data(future,s)
                data = str(future.result())
                yield from q.put(data)
@asyncio.coroutine
def consumer(loop,q):
       while True:
          a = yield from q.get()
          print("i am get..."+str(a)+"..."+str(type(a)))  
loop = asyncio.get_event_loop()

c = zmq.Context()
s = c.socket(zmq.REP)
s.bind('tcp://127.0.0.1:5515')

q = asyncio.Queue()
tasks=[asyncio.Task(producer(loop,q,s)),asyncio.Task(comsumer(loop,q))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
s.close()

看来消费者没有机会执行。

套接字每 500 毫秒接收一次数据,因此当 receive_data 函数中的 yield from 挂起生产者协程时,消费者协程将打印信息。

什么可以解释这个?

最佳答案

s.recv() 正在阻塞调用,因此 receive_data 挂起,直到新的 ZMQ 消息到达。

这会阻塞事件循环,消费者没有机会执行自己。

你可以传递zmq.NOBLOCK标志到.recv,如果没有可用数据给eventloop一个有机会迭代其他就绪任务。

或者只使用 aiozmq图书馆:)

关于python - 为什么 asyncio.Queue 无法按预期工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27183148/

相关文章:

python - 在 nose 或 pytest 中收集以编程方式生成的测试套件的好方法

python - boolean 标识 == True vs 是 True

zeromq - 在 zeromq 中结合 pub/sub 和 req/rep

node.js - 从 zmq 套接字返回值

python - 将多个字典附加到列表并转储到 json

python - Tensorflow 模型适用于 Python 但不适用于 C++

java - 获取 Spark 的流窗口时间戳

sockets - 在 ZeroMQ 中获取 TCP 地址信息

python - 无法安装 PyZMP for Python -- 依赖项

python - 难道cProfile背叛了我?