python - 异步: terminating all tasks when one of them throws exception

标签 python asynchronous exception python-asyncio with-statement

我正在编写 python 上下文管理器,它运行异步任务。如果我的经理的任何任务抛出异常,我希望我的经理终止。这是示例代码:

class MyClass:
  def __init__(self):
    if asyncio.get_event_loop().is_closed():
      asyncio.set_event_loop(asyncio.new_event_loop())

    self.loop = asyncio.get_event_loop()

  def __enter__(self):
    return self

  def __exit__(self, excType, excValue, tb):
    try:
      self.loop.run_until_complete(self._exit_loop())
    finally:
      self.loop.close()

    if excType is not None:
      print(excType.__name__, ':', excValue)
      traceback.print_tb(tb)

  async def _exit_loop(self):
    tasks = [task for task in asyncio.all_tasks(self.loop) if
             task is not asyncio.current_task(self.loop)]
    list(map(lambda task: task.cancel(), tasks))

    results = await asyncio.gather(*tasks, return_exceptions=True)
    self.loop.stop()


  async def func1(self):
    while True:
      print('func1')
      await asyncio.sleep(1)

  async def func2(self):
    i = 5
    while i > 0:
      print('func2')
      await asyncio.sleep(1)
      i -= 1
    raise Exception

  async def _async_start(self):
    self.loop.create_task(self.func1())
    self.loop.create_task(self.func2())

  def start(self):
    self.loop.run_until_complete(self._async_start())

with MyClass() as myClass:
  myClass.start()
  myClass.loop.run_forever()

这是该脚本的输出:

func1
func2
func1
func2
func1
func2
func1
func2
func1
func2
Task exception was never retrieved
func1
future: <Task finished coro=<MyClass.func2() done, defined at /home/framal/Programy/schnapps/schnapps/bottle/client.py:381> exception=Exception()>
Traceback (most recent call last):
  File "/home/framal/Programy/schnapps/schnapps/bottle/client.py", line 387, in func2
    raise Exception
Exception
func1
func1
func1
.
.
.

我尝试使用自定义异常处理程序,但没有任何效果 - 它们在强制终止进程后立即开始运行。

如何将异常传递给循环,以便它关闭所有其他任务?

最佳答案

更新:在 Python 3.11 中 a TaskGroup被介绍了。它具有所需的功能。


我不明白您为什么要这样使用上下文管理器 (CM)。

无论如何,如果给定了 CM 并且您将 loop.run_forever() 放入 with block ,这是我知道在这种情况下退出循环的唯一方法,所以控制权传递给 CM 的退出函数是 loop.stop()

这是一个小装饰器,处理除使用 loop.stop() 取消之外的所有异常。

def watchdog(afunc):
    @functools.wraps(afunc)
    async def run(*args, **kwargs):
        try:
            await afunc(*args, **kwargs)
        except asyncio.CancelledError:
            return
        except Exception as err:
            print(f"exception {err}")
            asyncio.get_event_loop().stop()
    return run

如果您装饰由 CM 作为任务启动的所有协程(func1func2),例如:

@watchdog
async def func2(self):

然后它将在第一次异常后停止。

关于python - 异步: terminating all tasks when one of them throws exception,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57660884/

相关文章:

python - 使用 panda.read_csv 与 numpy.loadtxt 时的输出差异

java - 如何处理IOException

python - tox.ini 的 Python tox 文档在哪里 [foo :bar] section inheriting from [foo]?

python - 如果另一列的值大于其他两列的值,则显示一列的值

python - Asyncio 使用 StreamReader 解码 utf-8

ios - 在 UIScrollview 中更新 subview 时滚动期间轻微卡住

java - 奇怪的 Java 异常行为

java - Java 应用程序中的 PKIX 路径构建失败

python - Python 中 'x = y = z' 赋值的语义

javascript - 如何编写非阻塞的javascript代码?