我正在开发一个旨在提供同步支持的异步库。为此,我使用 asgiref
,但出于演示目的,我将运行具有相同问题的 asyncio
。
我想将一个返回异步生成器的异步函数包装到装饰器中,该装饰器会将这个结果转换为经典生成器。 我需要在不消耗源生成器的情况下执行此操作,因为在此过程中我可能会处理大量数据。因此,我需要一次生成一个项目,而同步是经典的 python 上下文。
此主题已在 another thread 中讨论过对于非嵌套异步生成器的情况,用户提示了我今天在 this comment 中遇到的相同问题
我已将我的问题总结为以下示例:
import asyncio
async def async_generator_source():
yield 1
yield 2
yield 3
async def async_generator_wrapper():
async for item in async_generator_source():
yield f"source says: {item}"
def sync_generator_wrapper():
async_generator = async_generator_wrapper()
try:
while True:
print(asyncio.run(async_generator.__anext__()))
except StopAsyncIteration:
print("stop async iteration received")
sync_generator_wrapper()
我期望这个输出:
source says: 1
source says: 2
source says: 3
stop async iteration received
但是我有这个输出:
source says: 1
stop async iteration received
如何解决此问题?
请记住,async_generator_source
内容不应存储在列表中,因为在我的用例中它是一个非常大的数据集。
最佳答案
asyncio.run
完成后将关闭异步生成器,类似于调用 shutdown_asyncgens
事件循环的方法:
This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.
因此,在您的情况下,您不能多次使用 asyncio.run
来继续执行同一个异步生成器。
您可以通过获取对 asyncio 事件循环的引用并调用其 run_until_complete
来使代码生成预期的输出。执行异步生成器每次迭代的方法。
import asyncio
async def async_generator_source():
yield 1
yield 2
yield 3
async def async_generator_wrapper():
async for item in async_generator_source():
yield f"source says: {item}"
def sync_generator_wrapper():
async_generator = async_generator_wrapper()
loop = asyncio.get_event_loop() # this line added
try:
while True:
print(loop.run_until_complete(async_generator.__anext__())) # this line changed
except StopAsyncIteration:
print("stop async iteration received")
sync_generator_wrapper()
关于python - 当使用 asyncio.run 单独执行每个迭代时,嵌套的异步到同步生成器在第一次迭代时停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75914293/