python - python中ThreadPool中每个线程的超时

标签 python multithreading python-2.7 future concurrent.futures

我正在使用 Python 2.7。

我目前正在像这样使用 ThreadPoolExecuter:

params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    result = list(executor.map(f, params))

问题是 f 有时会运行太久。每当我运行 f 时,我都想将它的运行时间限制在 100 秒,然后终止它。

最后,对于 param 中的每个元素 x,我想知道 f 是否必须被杀死,如果不是 - 返回值是多少。 即使 f 一个参数超时,我仍然想用下一个参数运行它。

executer.map 方法确实有一个timeout 参数,但它为整个运行设置了一个超时时间,从调用executer 开始。 map,而不是分别针对每个线程。

获得我想要的行为的最简单方法是什么?

最佳答案

这个答案是关于 python 的多处理库,它通常比线程库更可取,除非您的函数只是在等待网络调用。请注意,多处理和线程库具有相同的接口(interface)。

鉴于您的进程每个可能运行 100 秒,相比之下,为每个进程创建一个进程的开销相当小。您可能必须制定自己的流程才能获得必要的控制权。

一个选项是将 f 包装在另一个最多执行 100 秒的函数中:

from multiprocessing import Pool

def timeout_f(arg):
    pool = Pool(processes=1)
    return pool.apply_async(f, [arg]).get(timeout=100)

然后你的代码变成:

    result = list(executor.map(timeout_f, params))

或者,您可以编写自己的线程/进程控制:

from multiprocessing import Process
from time import time

def chunks(l, n):
    """ Yield successive n-sized chunks from l. """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
    for p in five_processes:
        p.start()
    time_waited = 0
    start = time()
    for p in five_processes:
        if time_waited >= 100:
            p.join(0)
            p.terminate()
        p.join(100 - time_waited)
        p.terminate()
        time_waited = time() - start
    for p in five_processes:
        exit_codes.append(p.exit_code)

你必须通过类似 Can I get a return value from multiprocessing.Process? 的方式获取返回值

如果进程已完成,则进程的退出代码为 0,如果进程已终止,则退出代码为非零。

技术来自: Join a group of python processes with a timeout , How do you split a list into evenly sized chunks?


作为另一种选择,您可以尝试在 multiprocessing.Pool 上使用 apply_async

from multiprocessing import Pool, TimeoutError
from time import sleep    

if __name__ == "__main__":
    pool = Pool(processes=5)
    processes = [pool.apply_async(f, [i]) for i in params]
    results = []
    for process in processes:
        try:
            result.append(process.get(timeout=100))
        except TimeoutError as e:
            results.append(e)

请注意,上面的每个进程可能等待超过 100 秒,就好像第一个进程需要 50 秒才能完成,第二个进程的运行时间将多出 50 秒。需要更复杂的逻辑(例如前面的示例)来强制执行更严格的超时。

关于python - python中ThreadPool中每个线程的超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25976350/

相关文章:

Python 2.7 中用于 HTTP GET header 的正则表达式匹配

python - 我如何将 UNIX 时间戳列表转换为人类可读日期列表

python - 如何拆分列表中的字符串

java - 如何在java中使函数阻塞?

python - 如何从非 PyQt 类发出信号?

python - While 循环打印列表格式

python - 如何在 Python 中打开某种类型的所有文件并处理它们?

负值的 Python sort() 问题

c++ - try_lock_for 未按预期工作

java - RxJava flatMap 和背压奇怪的行为