python - 关闭无限异步生成器

标签 python python-asyncio

可重现的错误

我试图在在线 REPL 中重现错误 here .但是,它与我的真实代码(我在其中执行 async for response in position_stream() 而不是 REPL 中的 for position in count())并不完全相同(因此行为)。

关于我的实际实现的更多细节

我在某个地方定义了一个这样的协程:

async def position(self):
    request = telemetry_pb2.SubscribePositionRequest()
    position_stream = self._stub.SubscribePosition(request)

    try:
        async for response in position_stream:
            yield Position.translate_from_rpc(response)
    finally:
        position_stream.cancel()

其中 position_stream 是无限的(或者可能非常持久)。我从这样的示例代码中使用它:

async def print_altitude():
    async for position in drone.telemetry.position():
        print(f"Altitude: {position.relative_altitude_m}")

print_altitude()在循环中运行:

asyncio.ensure_future(print_altitude())
asyncio.get_event_loop().run_forever()

效果很好。现在,在某个时候,我想关闭来自调用者的流。我以为我可以运行 asyncio.ensure_future(loop.shutdown_asyncgens())等我的finally关闭上面得到调用,但它没有发生。

相反,我收到有关未检索异常的警告:

Task exception was never retrieved
future: <Task finished coro=<print_altitude() done, defined at [...]

这是为什么,我怎样才能让我所有的异步生成器真正关闭(并运行它们的 finally 子句)?

最佳答案

首先,如果您停止 一个循环,您的协程都没有机会正常关闭。调用 close 基本上意味着不可逆转地破坏循环。

如果你不关心那些正在运行的任务会发生什么,你可以简单地取消它们,这也会停止异步生成器:

import asyncio
from contextlib import suppress


async def position_stream():
    while True:
        await asyncio.sleep(1)
        yield 0

async def print_position():
    async for position in position_stream():
        print(f'position: {position}')

async def cleanup_awaiter():
    await asyncio.sleep(3)
    print('cleanup!')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        asyncio.ensure_future(print_position())
        asyncio.ensure_future(print_position())
        loop.run_until_complete(cleanup_awaiter())
        # get all running tasks:
        tasks = asyncio.gather(*asyncio.Task.all_tasks())
        # schedule throwing CancelledError into the them:
        tasks.cancel()
        # allow them to process the exception and be cancelled:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(tasks)
    finally:
        print('closing loop')
        loop.close()

关于python - 关闭无限异步生成器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53509935/

相关文章:

python - numpy ndarray 的子类不能按预期工作

python - 计算 DataFrame 中每一列的第一个非缺失值

python - 寻找使用 pysftp 加载/使用公共(public) ssh key 的示例

python - 如何使用 asyncio 进行基本文件 IO

python - gqlQuery 返回对象,想要键列表

python - 从基于生成器的协程转换为 native 协程

python - python asyncio 中的协议(protocol)工厂有什么要求?

python - 用于组合异步迭代器的映射、过滤器和 itertools

python - 来自 asyncio.gather 的任务不能同时工作

python - 如何让 itemgetter 从列表变量中获取输入?