Python - 如果一个失败,则停止所有 ThreadPoolExecutor future

标签 python multithreading concurrency

摘要:

当与ThreadPoolExecutor同时运行多个线程并且其中一个线程引发异常时,其余线程将继续执行。

如果其中一个线程引发异常,是否有办法停止所有其他仍在运行的线程?

代码:

此演示脚本显示,即使任务 t2 提前失败,任务 t1t3 仍继续运行:

from concurrent.futures import ThreadPoolExecutor
import time


def foo(name):
    print("Started task: " + name)
    if name == "t2":
        raise Exception("Forced fail")
    for i in range(5):
        print("Running task: " + name)
        time.sleep(1)
    return "Completed task: " + name


with ThreadPoolExecutor() as executor:
    futures = []
    futures.append(executor.submit(foo, "t1"))
    futures.append(executor.submit(foo, "t2"))
    futures.append(executor.submit(foo, "t3"))

    for future in futures:
        print(future.result())

尝试使用 wait()

我尝试在检测到 FIRST_EXCEPTION 时使用 wait(),但 future 未响应取消指令:

from concurrent.futures import wait, FIRST_EXCEPTION

with ThreadPoolExecutor() as executor:
    futures = []
    futures.append(executor.submit(foo, "t1"))
    futures.append(executor.submit(foo, "t2"))
    futures.append(executor.submit(foo, "t3"))

    done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
    for future in not_done:
        print("Cancelling: " + str(future))
        future.cancel()

    for future in futures:
        print(future.result())

最佳答案

据我了解,您的工作需要以下代码框架。请注意,task2 在引发异常之前会等待 5 秒,以便更容易观察代码的行为。

import threading
import time
from concurrent.futures import ThreadPoolExecutor

flag = False
flagLock = threading.Lock()


def foo(name):
    global flag
    print("Started task: " + name)
    if name == "t2":
        time.sleep(5)
        flagLock.acquire()
        flag = True
        flagLock.release()
        raise Exception("Forced fail")
    for i in range(5):
        flagLock.acquire()
        if flag:
            flagLock.release()
            return "One of the tasks failed, bailing out: " + name
        print("Running task: " + name)
        flagLock.release()
        time.sleep(1)
    flagLock.acquire()
    if flag:
        flagLock.release()
        return "Loops are done but bailing out anyway: " + name
    else:
        flagLock.release()
        return "Completed task: " + name


if __name__ == '__main__':

    with ThreadPoolExecutor() as executor:
        futures = [executor.submit(foo, "t1"), executor.submit(foo, "t2"), executor.submit(foo, "t3")]

        for future in futures:
            print(future.result())

基本上,上面的代码所做的是保留一个名为 flag 的 bool 值,并在作业的每个关键步骤中检查它。每当有问题的作业要引发异常时,就会设置标志,以便其他所有作业都知道它们应该停止。

此外,我们需要使用锁或信号量来保护此标志,以便多个线程不会同时访问它,从而导致获取过时的信息。如果您不熟悉锁/信号量,我强烈建议您好好研究一下,然后再使用它,这样您就不会遇到死锁或同样令人讨厌的问题。

关于Python - 如果一个失败,则停止所有 ThreadPoolExecutor future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67060670/

相关文章:

python - 如何使用机器人框架接受警报

python - TensorFlow 的可微分汉明损失

c++ - 是否可以使用线程来加快文件读取速度?

java - Cassandra = 单个节点上单个行上的列更新的原子性/隔离?

python - 如何按子组对 DataFrame 进行排名

java - 运行多个线程(每个线程都有自己的应用程序上下文)并正常关闭

java - 在 python 和 java 中设置线程关联

java - 从 JPanel 中动态删除组件

c++ - 想要一种在 C++ 中交换两个指针的有效方法

python - 我需要以相同的随机方式随机洗牌两个 pandas DataFrame 的行