说明:(简体)
代码:(简体)
import asyncio
async def download(data):
filename = "*" if data in ["b", "c"] else data # simulated failure
with open(filename, "w") as f:
f.write(data)
async def coro(data_list):
coroutines = [download(data) for data in data_list]
for coroutine in asyncio.as_completed(coroutines):
await coroutine
async def main():
task1 = asyncio.create_task(coro(["a", "b", "c"]))
task2 = asyncio.create_task(coro(["d", "e", "f"]))
results = await asyncio.gather(task1, task2, return_exceptions=True)
for _ in results:
pass
asyncio.run(main())
输出:(简化)Task exception was never retrieved
future: <Task finished coro=<download() done, defined at D:/myscript.py:2> exception=OSError(22, 'Invalid argument')>
Traceback (most recent call last):
File "D:/myscript.py", line 4, in download
with open(filename, "w") as f:
OSError: [Errno 22] Invalid argument: '*'
最佳答案
如果你想收集异常而不是引发它们,你可以使用 asyncio.gather(return_exceptions=True)
在 coro
以及。例如:
import asyncio
async def download(data):
if data in ['b', 'c']:
1/0 # simulate error
return 42 # success
async def coro(data_list):
coroutines = [download(data) for data in data_list]
return await asyncio.gather(*coroutines, return_exceptions=True)
async def main():
task1 = asyncio.create_task(coro(["a", "b", "c"]))
task2 = asyncio.create_task(coro(["d", "e", "f"]))
return await asyncio.gather(task1, task2, return_exceptions=True)
print(asyncio.run(main()))
这将打印:[[42, ZeroDivisionError('division by zero'), ZeroDivisionError('division by zero')], [42, 42, 42]]
关于Python - asyncio - 从未检索到任务异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65147823/