我的函数run_tasks(all_tasks, window_size)
,它采用 asyncio
的生成器任务并返回其值,同时:
- 同时运行
all_tasks
中的每个窗口(大小为window_size
) - 保留返回结果的顺序(
all_tasks[i]
结果是results[i]
) - 处理每次运行的异常
我当前的实现:
import asyncio
from itertools import islice
# run all tasks and return their results in the same order
# window is the max number of tasks that will run in parallel
def run_tasks(all_tasks, window_size=4):
loop = asyncio.get_event_loop()
while True:
window_tasks = list(islice(all_tasks, window_size))
if not window_tasks:
break
futures = asyncio.wait(window_tasks, loop=loop)
finished, unfinished = loop.run_until_complete(futures)
# sort finished tasks by their launch order.
# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))
for finished_task in finished:
try:
yield finished_task.result()
except Exception as e:
yield repr(e)
# Example Usage:
# a coroutine that sometime raises exception
async def sleepy(i):
print(f'{i} started')
await asyncio.sleep(10 - i)
print(f'{i} finished')
if i == 5:
raise ValueError('5 is the worst')
return i
# a generator of tasks
all_tasks = (sleepy(i) for i in range(10))
for result in list(run_tasks(all_tasks)):
print(result)
问题
我的实现的问题是,如果不访问 f._coro
这是 asyncio.Task
对象的内部属性,我无法找到对任务进行排序的方法。
# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))
我可以使用asyncio.gather(*tasks)
,但这不会处理错误。
我愿意接受有关如何在不访问 f._coro
的情况下为 run_tasks()
实现这三个属性的建议。
最佳答案
asyncio.gather
can如果您指定关键字参数return_exceptions
,则会返回错误。为了区分真正的异常和协程返回的异常对象,您可以使用 ensure_future
将 window_tasks
与任务包装起来:
futures = [asyncio.ensure_future(t, loop=loop) for t in window_tasks]
gathered = asyncio.gather(*futures, loop=loop, return_exceptions=True)
loop.run_until_complete(gathered)
for fut in futures:
try:
yield fut.result()
except Exception as e:
yield repr(e)
关于python - asyncio 运行多个任务,同时保留顺序并处理错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46013037/