想象一下,我们正在编写一个应用程序,它允许用户连续运行一个应用程序(假设这是针对 API 的一系列重要操作),并且可以同时运行多个应用程序。要求包括:
- 用户可以控制并发应用程序的数量(这可能会限制 API 的并发负载,这通常很重要)
- 如果操作系统尝试关闭运行该程序的 Python 程序,它应该正常终止,允许任何正在进行的应用程序在关闭之前完成其运行
这里的问题具体是关于我们编写的任务管理器的,所以让我们删除一些代码来说明这个问题:
import asyncio
import signal
async def work_chunk():
"""Simulates a chunk of work that can possibly fail"""
await asyncio.sleep(1)
async def protected_work():
"""All steps of this function MUST complete, the caller should shield it from cancelation."""
print("protected_work start")
for i in range(3):
await work_chunk()
print(f"protected_work working... {i+1} out of 3 steps complete")
print("protected_work done... ")
async def subtask():
print("subtask: starting loop of protected work...")
cancelled = False
while not cancelled:
protected_coro = asyncio.create_task(protected_work())
try:
await asyncio.shield(protected_coro)
except asyncio.CancelledError:
cancelled = True
await protected_coro
print("subtask: cancelation complete")
async def subtask_manager():
"""
Manage a pool of subtask workers.
(In the real world, the user can dynamically change the concurrency, but here we'll
hard code it at 3.)
"""
tasks = {}
while True:
for i in range(3):
task = tasks.get(i)
if not task or task.done():
tasks[i] = asyncio.create_task(subtask())
await asyncio.sleep(5)
def shutdown(signal, main_task):
"""Cleanup tasks tied to the service's shutdown."""
print(f"Received exit signal {signal.name}. Scheduling cancelation:")
main_task.cancel()
async def main():
print("main... start")
coro = asyncio.ensure_future(subtask_manager())
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, lambda: shutdown(signal.SIGINT, coro))
loop.add_signal_handler(signal.SIGTERM, lambda: shutdown(signal.SIGTERM, coro))
await coro
print("main... done")
def run():
asyncio.run(main())
run()
subtask_manager
管理工作人员池,定期查找当前的并发要求并适当更新事件工作人员的数量(请注意,上面的代码删除了大部分工作人员,只是硬编码一个数字,因为它对问题并不重要)。
subtask
是工作循环本身,它持续运行 protected_work()
直到有人取消它。
但是这段代码已被破坏。当你给它一个 SIGINT 时,整个系统立即崩溃。
在进一步解释之前,让我指出一段关键的代码:
1 protected_coro = asyncio.create_task(protected_work())
2 try:
3 await asyncio.shield(protected_coro)
4 except asyncio.CancelledError:
5 cancelled = True
6 await protected_coro # <-- This will raise CancelledError too!
经过一些调试,我们发现我们的 try/except block 不起作用。我们发现第 3 行和第 6 行都引发了 CancelledError。
当我们进一步深入时,我们发现所有“await”调用在取消子任务管理器后都会抛出 CancelledError,而不仅仅是上面提到的那一行。 (即work_chunk()的第二行,await asyncio.sleep(1)
,和protected_work()的第四行,await work_chunk()
,还会引发 CancelledError。)
这是怎么回事?
出于某种原因,Python 似乎并没有像您期望的那样传播取消,只是举起双手说“我现在要取消一切”。
为什么?
显然,我不明白取消传播在 Python 中是如何工作的。我一直在努力寻找有关其工作原理的文档。有人可以向我描述取消是如何以清晰的方式传播的,以解释上面示例中发现的行为吗?
最佳答案
在长时间研究这个问题并尝试其他代码片段(其中取消传播按预期工作)之后,我开始怀疑问题是否是 Python 不知道顺序在这里的传播,在这个情况下。
但是为什么呢?
嗯,subtask_manager
创建任务,但不等待它们。
Python 是否不会假设创建该任务的协程(使用 create_task
)拥有该任务?我认为 Python 使用 await
关键字独占来知道以什么顺序传播取消,以及在遍历整个任务树后是否找到任务仍然没有被取消,它只是将它们全部摧毁。
因此,我们需要在任何我们知道尚未等待异步任务的地方自行管理任务取消传播。因此,我们需要重构 subtask_manager
以捕获其自身的取消,并显式取消然后等待其所有子任务:
async def subtask_manager():
"""
Manage a pool of subtask workers.
(In the real world, the user can dynamically change the concurrency, but here we'll
hard code it at 3.)
"""
tasks = {}
while True:
for i in range(3):
task = tasks.get(i)
if not task or task.done():
tasks[i] = asyncio.create_task(subtask())
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
print("cancelation detected, canceling children")
[t.cancel() for t in tasks.values()]
await asyncio.gather(*[t for t in tasks.values()])
return
现在我们的代码可以按预期工作:
注意:我已经以问答方式回答了我自己的问题,但我仍然对我关于取消传播如何工作的文字回答不满意。如果有人对取消传播的工作原理有更好的解释,我很乐意阅读。
关于python - 使用 create_task 创建的任务从不等待,似乎打破了取消子任务的期望,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60200320/