我见过asyncio.gather vs asyncio.wait ,但不确定这是否解决了这个特定问题。我想做的是将 asyncio.gather()
协程包装在 asyncio.wait_for()
中,并带有 timeout
参数。我还需要满足这些条件:
return_exceptions=True
(来自asyncio.gather()
) - 而不是将异常传播到等待gather()
的任务,我想在结果中包含异常实例- 顺序:保留
asyncio.gather()
的属性,即结果的顺序与输入的顺序相同。 (或者,将输出映射回输入。)。asyncio.wait_for()
未满足此标准,我不确定实现它的理想方法。
超时是针对可等待列表中的整个 asyncio.gather()
- 如果它们陷入超时或返回异常,其中任何一个案例应该只在结果列表中放置一个异常实例。
考虑这个设置:
>>> import asyncio
>>> import random
>>> from time import perf_counter
>>> from typing import Iterable
>>> from pprint import pprint
>>>
>>> async def coro(i, threshold=0.4):
... await asyncio.sleep(i)
... if i > threshold:
... # For illustration's sake - some coroutines may raise,
... # and we want to accomodate that and just test for exception
... # instances in the results of asyncio.gather(return_exceptions=True)
... raise Exception("i too high")
... return i
...
>>> async def main(n, it: Iterable):
... res = await asyncio.gather(
... *(coro(i) for i in it),
... return_exceptions=True
... )
... return res
...
>>>
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> res = asyncio.run(main(n, it=it))
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds") # Expectation: ~1 seconds
Done main(10) in 0.86 seconds
>>> pprint(dict(zip(it, res)))
{0.01323751590501987: 0.01323751590501987,
0.07422124156714727: 0.07422124156714727,
0.3088946587429545: 0.3088946587429545,
0.3113884366691503: 0.3113884366691503,
0.4419557492849159: Exception('i too high'),
0.4844375347808497: Exception('i too high'),
0.5796792804615848: Exception('i too high'),
0.6338658027451068: Exception('i too high'),
0.7426396870165088: Exception('i too high'),
0.8614799253779063: Exception('i too high')}
上面的程序,n = 10
,执行时间为 0.5 秒,加上异步运行时的一些开销。 (random.random()
将均匀分布在 [0, 1) 中。)
假设我想将其作为超时强加于整个操作(即在协程 main()
上):
timeout = 0.5
现在,我可以使用asyncio.wait()
,但问题是结果是set
对象,因此绝对不能保证排序的返回值属性asyncio.gather()
:
>>> async def main(n, it, timeout) -> tuple:
... tasks = [asyncio.create_task(coro(i)) for i in it]
... done, pending = await asyncio.wait(tasks, timeout=timeout)
... return done, pending
...
>>> timeout = 0.5
>>> random.seed(444)
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> done, pending = asyncio.run(main(n, it=it, timeout=timeout))
>>> for i in pending:
... i.cancel()
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")
Done main(10) in 0.50 seconds
>>> done
{<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>}
>>> pprint(done)
{<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>}
>>> pprint(pending)
{<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>}
如上所述,问题是我似乎无法将 task
实例映射回 iterable
中的输入。它们的任务 ID 在 tasks = [asyncio.create_task(coro(i)) for i in it]
的函数作用域内有效地丢失。是否有Pythonic方式/使用asyncio API来模仿asyncio.gather()
的行为?
最佳答案
看看底层 _wait()
协程,该协程获取任务列表,并将修改这些任务的状态。这意味着,在main()
范围内,tasks
来自tasks = [asyncio.create_task(coro(i)) for i in it]
将通过调用 await asyncio.wait(tasks, timeout=timeout)
进行修改。而不是返回 (done, pending)
元组,一种解决方法是返回 tasks
本身,它保留输入 it
的顺序。 wait()
/_wait()
只是将任务分成已完成/待处理的子集,在这种情况下,我们可以丢弃这些子集并使用 tasks
的整个列表。其元素已被更改。
在这种情况下,存在三种可能的任务状态:
- 任务返回了有效结果 (
coro()
),未引发异常,并且在timeout
下完成。它的.cancelled()
将为 False,并且它具有有效的.result()
这不是异常实例 - 任务在有机会返回结果或引发异常之前遇到超时。它将显示
.cancelled()
及其.exception()
将提出CancelledError
- 允许时间完成并引发异常的任务
coro()
;它将显示.cancelled()
作为 False 及其exception()
将提高
(所有这些都在 asyncio/futures.py 中列出。)
<小时/>插图:
>>> # imports/other code snippets - see question
>>> async def main(n, it, timeout) -> tuple:
... tasks = [asyncio.create_task(coro(i)) for i in it]
... await asyncio.wait(tasks, timeout=timeout)
... return tasks # *not* (done, pending)
>>> timeout = 0.5
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> tasks = asyncio.run(main(n, it=it, timeout=timeout))
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")
Done main(10) in 0.50 seconds
>>> pprint(tasks)
[<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>]
现在应用上面的逻辑,让res
保留与输入相对应的顺序:
>>> res = []
>>> for t in tasks:
... try:
... r = t.result()
... except Exception as e:
... res.append(e)
... else:
... res.append(r)
>>> pprint(res)
[0.3088946587429545,
0.01323751590501987,
Exception('i too high'),
CancelledError(),
CancelledError(),
CancelledError(),
Exception('i too high'),
0.3113884366691503,
0.07422124156714727,
CancelledError()]
>>> dict(zip(it, res))
{0.3088946587429545: 0.3088946587429545,
0.01323751590501987: 0.01323751590501987,
0.4844375347808497: Exception('i too high'),
0.8614799253779063: concurrent.futures._base.CancelledError(),
0.7426396870165088: concurrent.futures._base.CancelledError(),
0.6338658027451068: concurrent.futures._base.CancelledError(),
0.4419557492849159: Exception('i too high'),
0.3113884366691503: 0.3113884366691503,
0.07422124156714727: 0.07422124156714727,
0.5796792804615848: concurrent.futures._base.CancelledError()}
关于python - 在超时中包装 asyncio.gather,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54427248/