摘要:
当与ThreadPoolExecutor
同时运行多个线程并且其中一个线程引发异常时,其余线程将继续执行。
如果其中一个线程引发异常,是否有办法停止所有其他仍在运行的线程?
代码:
此演示脚本显示,即使任务 t2
提前失败,任务 t1
和 t3
仍继续运行:
from concurrent.futures import ThreadPoolExecutor
import time
def foo(name):
print("Started task: " + name)
if name == "t2":
raise Exception("Forced fail")
for i in range(5):
print("Running task: " + name)
time.sleep(1)
return "Completed task: " + name
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(foo, "t1"))
futures.append(executor.submit(foo, "t2"))
futures.append(executor.submit(foo, "t3"))
for future in futures:
print(future.result())
尝试使用 wait()
我尝试在检测到 FIRST_EXCEPTION
时使用 wait()
,但 future 未响应取消指令:
from concurrent.futures import wait, FIRST_EXCEPTION
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(foo, "t1"))
futures.append(executor.submit(foo, "t2"))
futures.append(executor.submit(foo, "t3"))
done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
for future in not_done:
print("Cancelling: " + str(future))
future.cancel()
for future in futures:
print(future.result())
最佳答案
据我了解,您的工作需要以下代码框架。请注意,task2 在引发异常之前会等待 5 秒,以便更容易观察代码的行为。
import threading
import time
from concurrent.futures import ThreadPoolExecutor
flag = False
flagLock = threading.Lock()
def foo(name):
global flag
print("Started task: " + name)
if name == "t2":
time.sleep(5)
flagLock.acquire()
flag = True
flagLock.release()
raise Exception("Forced fail")
for i in range(5):
flagLock.acquire()
if flag:
flagLock.release()
return "One of the tasks failed, bailing out: " + name
print("Running task: " + name)
flagLock.release()
time.sleep(1)
flagLock.acquire()
if flag:
flagLock.release()
return "Loops are done but bailing out anyway: " + name
else:
flagLock.release()
return "Completed task: " + name
if __name__ == '__main__':
with ThreadPoolExecutor() as executor:
futures = [executor.submit(foo, "t1"), executor.submit(foo, "t2"), executor.submit(foo, "t3")]
for future in futures:
print(future.result())
基本上,上面的代码所做的是保留一个名为 flag
的 bool 值,并在作业的每个关键步骤中检查它。每当有问题的作业要引发异常时,就会设置标志,以便其他所有作业都知道它们应该停止。
此外,我们需要使用锁或信号量来保护此标志,以便多个线程不会同时访问它,从而导致获取过时的信息。如果您不熟悉锁/信号量,我强烈建议您好好研究一下,然后再使用它,这样您就不会遇到死锁或同样令人讨厌的问题。
关于Python - 如果一个失败,则停止所有 ThreadPoolExecutor future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67060670/