具有异步的 Python 事件处理程序(非阻塞 while 循环)

标签 python python-3.x python-asyncio

import queue

qq = queue.Queue()
qq.put('hi')

class MyApp():

    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    def get_item(self):
        try:
            item = self._queue.get_nowait()
            self._process_item(item)
        except queue.Empty:
            pass

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            self.get_item()
            await asyncio.sleep(0)      

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

使用 Python 3.6。

我正在尝试编写一个事件处理程序,不断监听 queue 中的消息,并处理它们(在本例中打印它们)。但它必须是异步 - 我需要能够在终端 (IPython) 中运行它并手动将内容提供给 queue(至少在最初用于测试)。

此代码不起作用 - 它会永远阻塞。

如何让它永远运行,但在 while 循环的每次迭代后返回控制权?

谢谢。

边注: 为了使事件循环与 IPython(7.2 版)一起工作,我使用了 this来自 ib_insync 库的代码,我使用这个库来解决上面示例中的实际问题。

最佳答案

您需要将队列设为 asyncio.Queue,并以线程安全的方式向队列中添加内容。例如:

qq = asyncio.Queue()

class MyApp():
    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    async def get_item(self):
        item = await self._queue.get()
        self._process_item(item)

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            await self.get_item()

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

您的其他线程必须像这样将内容放入队列中:

loop.call_soon_threadsafe(qq.put_nowait, <item>)

call_soon_threadsafe 将确保正确锁定,并确保在新队列项准备就绪时唤醒事件循环。

关于具有异步的 Python 事件处理程序(非阻塞 while 循环),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55151635/

相关文章:

python - 在 linux 上升级 python3 中使用的 SQLite3 版本?

python - 如何将 Asyncio 与 while 循环一起使用

python - 将嵌入式 python asyncio 集成到 boost::asio 事件循环中

python-3.x - Pandas系列垂直合并

Python:创建大小为 n^2 的元组的时间和空间复杂度

python - 在 python 中使用 SOM 进行聚类

python - 带掩码的 numpy 赋值

python - 带有串行设备的 asyncio python 占用 100% CPU

python - 在多线程 Python 应用程序中最小化 MySQL 连接开销的正确方法是什么?

python - 在类实例和类定义上调用类装饰器有什么区别?