我正在使用 Python 2.7。
我目前正在像这样使用 ThreadPoolExecuter:
params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
result = list(executor.map(f, params))
问题是 f
有时会运行太久。每当我运行 f
时,我都想将它的运行时间限制在 100 秒,然后终止它。
最后,对于 param
中的每个元素 x
,我想知道 f
是否必须被杀死,如果不是 - 返回值是多少。
即使 f
一个参数超时,我仍然想用下一个参数运行它。
executer.map
方法确实有一个timeout
参数,但它为整个运行设置了一个超时时间,从调用executer 开始。 map
,而不是分别针对每个线程。
获得我想要的行为的最简单方法是什么?
最佳答案
这个答案是关于 python 的多处理库,它通常比线程库更可取,除非您的函数只是在等待网络调用。请注意,多处理和线程库具有相同的接口(interface)。
鉴于您的进程每个可能运行 100 秒,相比之下,为每个进程创建一个进程的开销相当小。您可能必须制定自己的流程才能获得必要的控制权。
一个选项是将 f 包装在另一个最多执行 100 秒的函数中:
from multiprocessing import Pool
def timeout_f(arg):
pool = Pool(processes=1)
return pool.apply_async(f, [arg]).get(timeout=100)
然后你的代码变成:
result = list(executor.map(timeout_f, params))
或者,您可以编写自己的线程/进程控制:
from multiprocessing import Process
from time import time
def chunks(l, n):
""" Yield successive n-sized chunks from l. """
for i in xrange(0, len(l), n):
yield l[i:i+n]
processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
for p in five_processes:
p.start()
time_waited = 0
start = time()
for p in five_processes:
if time_waited >= 100:
p.join(0)
p.terminate()
p.join(100 - time_waited)
p.terminate()
time_waited = time() - start
for p in five_processes:
exit_codes.append(p.exit_code)
你必须通过类似 Can I get a return value from multiprocessing.Process? 的方式获取返回值
如果进程已完成,则进程的退出代码为 0,如果进程已终止,则退出代码为非零。
技术来自: Join a group of python processes with a timeout , How do you split a list into evenly sized chunks?
作为另一种选择,您可以尝试在 multiprocessing.Pool 上使用 apply_async
from multiprocessing import Pool, TimeoutError
from time import sleep
if __name__ == "__main__":
pool = Pool(processes=5)
processes = [pool.apply_async(f, [i]) for i in params]
results = []
for process in processes:
try:
result.append(process.get(timeout=100))
except TimeoutError as e:
results.append(e)
请注意,上面的每个进程可能等待超过 100 秒,就好像第一个进程需要 50 秒才能完成,第二个进程的运行时间将多出 50 秒。需要更复杂的逻辑(例如前面的示例)来强制执行更严格的超时。
关于python - python中ThreadPool中每个线程的超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25976350/