import queue
qq = queue.Queue()
qq.put('hi')
class MyApp():
def __init__(self, q):
self._queue = q
def _process_item(self, item):
print(f'Processing this item: {item}')
def get_item(self):
try:
item = self._queue.get_nowait()
self._process_item(item)
except queue.Empty:
pass
async def listen_for_orders(self):
'''
Asynchronously check the orders queue for new incoming orders
'''
while True:
self.get_item()
await asyncio.sleep(0)
a = MyApp(qq)
loop = asyncio.get_event_loop()
loop.run_until_complete(a.listen_for_orders())
使用 Python 3.6。
我正在尝试编写一个事件处理程序,不断监听 queue
中的消息,并处理它们(在本例中打印它们)。但它必须是异步 - 我需要能够在终端 (IPython) 中运行它并手动将内容提供给 queue
(至少在最初用于测试)。
此代码不起作用 - 它会永远阻塞。
如何让它永远运行,但在 while
循环的每次迭代后返回控制权?
谢谢。
边注:
为了使事件循环与 IPython(7.2 版)一起工作,我使用了 this来自 ib_insync
库的代码,我使用这个库来解决上面示例中的实际问题。
最佳答案
您需要将队列设为 asyncio.Queue
,并以线程安全的方式向队列中添加内容。例如:
qq = asyncio.Queue()
class MyApp():
def __init__(self, q):
self._queue = q
def _process_item(self, item):
print(f'Processing this item: {item}')
async def get_item(self):
item = await self._queue.get()
self._process_item(item)
async def listen_for_orders(self):
'''
Asynchronously check the orders queue for new incoming orders
'''
while True:
await self.get_item()
a = MyApp(qq)
loop = asyncio.get_event_loop()
loop.run_until_complete(a.listen_for_orders())
您的其他线程必须像这样将内容放入队列中:
loop.call_soon_threadsafe(qq.put_nowait, <item>)
call_soon_threadsafe
将确保正确锁定,并确保在新队列项准备就绪时唤醒事件循环。
关于具有异步的 Python 事件处理程序(非阻塞 while 循环),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55151635/