python - 当使用 asyncio.run 单独执行每个迭代时,嵌套的异步到同步生成器在第一次迭代时停止

标签 python python-asyncio

我正在开发一个旨在提供同步支持的异步库。为此,我使用 asgiref,但出于演示目的,我将运行具有相同问题的 asyncio

我想将一个返回异步生成器的异步函数包装到装饰器中,该装饰器会将这个结果转换为经典生成器。 我需要在不消耗源生成器的情况下执行此操作,因为在此过程中我可能会处理大量数据。因此,我需要一次生成一个项目,而同步是经典的 python 上下文。

此主题已在 another thread 中讨论过对于非嵌套异步生成器的情况,用户提示了我今天在 this comment 中遇到的相同问题

我已将我的问题总结为以下示例:

import asyncio


async def async_generator_source():
    yield 1
    yield 2
    yield 3


async def async_generator_wrapper():
    async for item in async_generator_source():
        yield f"source says: {item}"


def sync_generator_wrapper():
    async_generator = async_generator_wrapper()
    try:
        while True:
            print(asyncio.run(async_generator.__anext__()))
    except StopAsyncIteration:
        print("stop async iteration received")


sync_generator_wrapper()

我期望这个输出:

source says: 1
source says: 2
source says: 3
stop async iteration received

但是我有这个输出:

source says: 1
stop async iteration received

如何解决此问题?

请记住,async_generator_source 内容不应存储在列表中,因为在我的用例中它是一个非常大的数据集。

最佳答案

asyncio.run完成后将关闭异步生成器,类似于调用 shutdown_asyncgens事件循环的方法:

This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.

因此,在您的情况下,您不能多次使用 asyncio.run 来继续执行同一个异步生成器。

您可以通过获取对 asyncio 事件循环的引用并调用其 run_until_complete 来使代码生成预期的输出。执行异步生成器每次迭代的方法。

import asyncio

async def async_generator_source():
    yield 1
    yield 2
    yield 3

async def async_generator_wrapper():
    async for item in async_generator_source():
        yield f"source says: {item}"

def sync_generator_wrapper():
    async_generator = async_generator_wrapper()
    loop = asyncio.get_event_loop()  # this line added
    try:
        while True:
            print(loop.run_until_complete(async_generator.__anext__()))  # this line changed
    except StopAsyncIteration:
        print("stop async iteration received")

sync_generator_wrapper()

关于python - 当使用 asyncio.run 单独执行每个迭代时,嵌套的异步到同步生成器在第一次迭代时停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75914293/

相关文章:

python - 如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定(bind)任务)?

python - 将具有不同数量整数的文本文件读取到多个列表中

python - 使用 read() 从 VideoCapture 对象获取图像

python - 使用 distutils 编译 :/usr/lib is added as a file to compile

amazon-web-services - 模块未找到错误: No module named 'aiohttp' in AWS Glue

python - 有没有办法在多线程中使用 asyncio.Queue?

python - beautifulsoup - 如何查找以特定属性开头的标签?

python - 使用 Python 将 stdout 和 stderr 重定向到同一文件

python-asyncio - 从在单独线程中运行的同步代码将项目放入 asyncio.Queue 的最简单方法

python - AIOHTTP - Application.make_handler(...) 已弃用 - 添加多处理