python - asyncio 运行多个任务,同时保留顺序并处理错误

标签 python python-3.x python-asyncio

我的函数run_tasks(all_tasks, window_size),它采用 asyncio 的生成器任务并返回其值,同时:

  1. 同时运行 all_tasks 中的每个窗口(大小为 window_size)
  2. 保留返回结果的顺序(all_tasks[i] 结果是 results[i])
  3. 处理每次运行的异常

我当前的实现:

import asyncio
from itertools import islice


# run all tasks and return their results in the same order
# window is the max number of tasks that will run in parallel
def run_tasks(all_tasks, window_size=4):
    loop = asyncio.get_event_loop()

    while True:
        window_tasks = list(islice(all_tasks, window_size))
        if not window_tasks:
            break

        futures = asyncio.wait(window_tasks, loop=loop)
        finished, unfinished = loop.run_until_complete(futures)

        # sort finished tasks by their launch order.
        # removing this line makes returned tasks unordered
        finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))

        for finished_task in finished:
            try:
                yield finished_task.result()
            except Exception as e:
                yield repr(e)

# Example Usage:

# a coroutine that sometime raises exception
async def sleepy(i):
    print(f'{i} started')
    await asyncio.sleep(10 - i)
    print(f'{i} finished')
    if i == 5:
        raise ValueError('5 is the worst')
    return i

# a generator of tasks
all_tasks = (sleepy(i) for i in range(10))

for result in list(run_tasks(all_tasks)):
    print(result)

问题

我的实现的问题是,如果不访问 f._coro 这是 asyncio.Task 对象的内部属性,我无法找到对任务进行排序的方法。

# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))

我可以使用asyncio.gather(*tasks),但这不会处理错误。

我愿意接受有关如何在不访问 f._coro 的情况下为 run_tasks() 实现这三个属性的建议。

最佳答案

asyncio.gather can如果您指定关键字参数return_exceptions,则会返回错误。为了区分真正的异常和协程返回的异常对象,您可以使用 ensure_futurewindow_tasks 与任务包装起来:

futures = [asyncio.ensure_future(t, loop=loop) for t in window_tasks]
gathered =  asyncio.gather(*futures, loop=loop, return_exceptions=True)
loop.run_until_complete(gathered)

for fut in futures:
    try:
        yield fut.result()
    except Exception as e:
        yield repr(e)

关于python - asyncio 运行多个任务,同时保留顺序并处理错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46013037/

相关文章:

python - 获取重复时间的日期时间对象列表

python-3.x - 克隆前和克隆后的 keras 模型表现不同

python-3.x - 未定义 DRF 请求来获取当前用户 ID

python - 如何将协程添加到正在运行的 asyncio 循环中?

python - sqs 的异步等待接收消息无法正常工作

python - 程序退出时消息中的异常被忽略

python - 打印不符合数据间隔设置

python - 为什么 pip 需求文件包含 "@file"而不是版本号?

python - 如何使用 lstm 执行多类多输出分类

window - 如何在 Python 3k 中获取窗口或全屏截图? (不含 PIL)