我正在编写简单的生产者/消费者程序。
import zmq
@asyncio.coroutine
def receive_data(future,s):
print("begin to recv sth from.....socket"
my_data = s.recv()
future.set_result(my_data)
@asyncio.coroutine
def producer(loop,q,s):
while True:
future = asyncio.Future()
yield from receive_data(future,s)
data = str(future.result())
yield from q.put(data)
@asyncio.coroutine
def consumer(loop,q):
while True:
a = yield from q.get()
print("i am get..."+str(a)+"..."+str(type(a)))
loop = asyncio.get_event_loop()
c = zmq.Context()
s = c.socket(zmq.REP)
s.bind('tcp://127.0.0.1:5515')
q = asyncio.Queue()
tasks=[asyncio.Task(producer(loop,q,s)),asyncio.Task(comsumer(loop,q))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
s.close()
看来消费者没有机会执行。
套接字每 500 毫秒接收一次数据,因此当 receive_data 函数中的 yield from
挂起生产者协程时,消费者协程将打印信息。
什么可以解释这个?
最佳答案
s.recv()
正在阻塞调用,因此 receive_data
挂起,直到新的 ZMQ 消息到达。
这会阻塞事件循环,消费者没有机会执行自己。
你可以传递zmq.NOBLOCK
标志到.recv
,如果没有可用数据给eventloop一个有机会迭代其他就绪任务。
或者只使用 aiozmq图书馆:)
关于python - 为什么 asyncio.Queue 无法按预期工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27183148/