documentation concurrent.futures
的超时非常难以理解。在一个简单的例子中,我想通过在扫描作业函数列表的循环中调用 .submit
来使用 ProcessPoolExecutor
。我希望这些 Future
对象中的每一个都有 10 分钟的关联超时,否则它们将异步完成。
我的第一个方法是尝试使用 as_completed
函数,它生成 Future 对象的迭代器,并且仅在一个完成时生成下一个。 as_completed
接受一个 timeout
参数,但是文档说这个超时是相对于 as_completed
被调用的第一时刻而言的,而不一定是任何Future
对象本身。
例如假设 ProcessPoolExecutor
只有 3 个工作进程,但 Future
对象列表包含 10 个项目。在处理前 3 个项目时,其中 7 个项目可能处于未处理状态长达 10 分钟。此后不久,as_completed
的超时将被触发,导致失败,即使每个单独的 Future
可能已经达到了 10 分钟的限制。
请注意,适用于 as_completed
的相同限制也适用于 wait
并且 wait
更难用于此用例,因为它支持的返回选项有限。
我的下一个想法是使用 timeout
parameter that future.result
allows并为我的 future 列表中的每个 f
(Future)调用 f.result(timeout=600)
。但是,没有真正以阻塞方式要求结果的方式来设置此超时。如果您迭代 future 列表并调用 f.result(...)
,此调用会在指定的超时时间内阻塞。
另一方面,您也不能将 f.result
与 as_completed
组合在一起,以一种天真但看似正确的方式,如
[f.result(timeout=600) for f in as_completed(futures_list)]
... 因为 as_completed
的迭代在 futures 完成时欺骗性地异步等待,并且只返回它们以在 之后调用 .result
他们已经完成了。
鉴于此,生成 Future
列表的正确模式是什么,其中每个都有自己的超时时间,然后异步等待它们完成?
最佳答案
似乎没有办法在这种异步上下文中提供每个 Future 超时。可用的 API 函数 wait
和 as_completed
通过支持 Future
对象的可迭代中所有任务的全局超时来采取更简单的方法,并且不要尝试测量从 Future
第一次开始处于被处理状态的时间。
我选择了一种解决方法,将我的任务列表分成一组 block ,并对每个 block 使用 as_completed
。 block 大小设置为与我的 ProcessPoolExecutor
配置使用的工作人员数量相同,这样我就可以确定 as_completed
的“全局”超时由于所有任务都立即主动处理,因此 secret 地充当了每个 future 的超时。缺点是利用率较低,因为当任务提早完成时,进程池不能空闲以获取下一个 Future 任务;它必须等待整个下一批任务。对我来说这没问题,但它是我必须选择的 concurrent.futures
的重大可用性缺陷。
这是一些示例代码。假设 my_task_list
已经包含通过 functools.partial
或其他方式绑定(bind)的部分或全部必需参数的函数。您可以修改它,以便在元组或字典的单独迭代中提供参数,并根据需要传递到 submit
。
my_task_list = #... define your list of task functions
num_workers = #... set number of workers
my_timeout = #... define your timeout
with ProcessPoolExecutor(max_workers=num_workers) as pool:
all_results = []
for chunk_start in range(0, len(my_task_list), num_workers):
chunk = my_task_list[chunk_start:chunk_start + num_workers]
# could extract parameters to pass for this task chunk here.
futures = [pool.submit(task) for task in chunk]
all_results += [
f.result() for f in as_completed(futures, timeout=my_timeout)
]
return all_results
请注意,如果您选择的 num_workers
大于 ProcessPoolExecutor
可用的处理器数量,您最终会得到比给定 block 中的处理器更多的任务并返回对于 as_completed
的超时不会正确应用于每个任务的运行时的情况,可能会导致与仅使用 as_completed
或 wait
在没有分块的整体任务列表上。
关于python - 如何确保 concurrent.futures 迭代器中每个 Future 的超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60951208/