python - 如何使用带超时的 concurrent.futures?

标签 python concurrency timeout python-3.x

我正在尝试使用 concurrent.futures 模块让超时在 python3.2 中工作。然而,当它确实超时时,它并没有真正停止执行。我尝试同时使用线程和进程池执行器,它们都没有停止任务,并且只有在任务完成后才会引发超时。那么有谁知道是否有可能让它发挥作用?

import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0;
    for i in range(1, max_number + 1):
        last_number = i * i
    return last_number

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")

if __name__ == '__main__':
    main()

最佳答案

据我所知,TimeoutError 实际上是在您期望的时候引发的,而不是在任务完成之后。

但是,您的程序本身将继续运行,直到所有正在运行的任务都已完成。这是因为当前正在执行的任务(在您的情况下,可能是您提交的所有任务,因为您的池大小等于任务数)实际上并未被“杀死”。

引发了 TimeoutError,因此您可以选择不等到任务完成(而是做其他事情),但任务将继续运行直到完成。只要执行器的线程/子进程中有未完成的任务,python 就不会退出。

据我所知,仅仅“停止”当前正在执行的 Futures 是不可能的,您只能“取消”尚未开始的计划任务。在你的情况下,不会有任何东西,但想象一下你有 5 个线程/进程的池,并且你想要处理 100 个项目。在某些时候,可能有 20 个已完成的任务、5 个正在运行的任务和 75 个已安排的任务。在这种情况下,您可以取消这 76 个计划任务,但正在运行的 4 个任务将继续执行直到完成,无论您是否等待结果。

尽管不能那样做,但我想应该有办法达到您想要的最终结果。也许这个版本可以帮助你(不确定它是否完全符合你的要求,但它可能会有一些用处):

import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

class Task:
    def __init__(self, max_number):
        self.max_number = max_number
        self.interrupt_requested = False

    def __call__(self):
        print("Started:", datetime.datetime.now(), self.max_number)
        last_number = 0;
        for i in xrange(1, self.max_number + 1):
            if self.interrupt_requested:
                print("Interrupted at", i)
                break
            last_number = i * i
        print("Reached the end")
        return last_number

    def interrupt(self):
        self.interrupt_requested = True

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
        tasks = [Task(num) for num in max_numbers]
        for task, future in [(i, executor.submit(i)) for i in tasks]:
            try:
                print(future.result(timeout=1))
            except concurrent.futures.TimeoutError:
                print("this took too long...")
                task.interrupt()


if __name__ == '__main__':
    main()

通过为每个“任务”创建一个可调用对象,并将这些对象提供给执行者,而不仅仅是一个普通函数,您可以提供一种“中断”任务的方法。 提示:删除 task.interrupt() 行,看看会发生什么,这可能更容易理解我上面的长篇解释;-)

关于python - 如何使用带超时的 concurrent.futures?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6509261/

相关文章:

python - 从 scipy.optimize.curve_fit 获取与参数估计相关的标准误差

python - 在两个列表中查找匹配的子串

ios - 串行调度队列如何保证资源保护?

compiler-errors - 编译超时LATEX

python - 如何使用 asyncio 添加连接超时?

http - golang 获取大量读取 tcp ip :port i/o timeout in ubuntu 14. 04 LTS

python - 在列表中以特定间隔添加项目 (Python)

python - 我应该如何在 Python 中实现 "nested"子命令?

java - 这种并发相关的类怎么调用呢?是否有标准实现?

java - 通过 ExecutorCompletionService 执行时任务是否并行化?