python - 如何确保 concurrent.futures 迭代器中每个 Future 的超时?

标签 python python-3.x asynchronous timeout concurrent.futures

documentation concurrent.futures 的超时非常难以理解。在一个简单的例子中,我想通过在扫描作业函数列表的循环中调用 .submit 来使用 ProcessPoolExecutor。我希望这些 Future 对象中的每一个都有 10 分钟的关联超时,否则它们将异步完成。

我的第一个方法是尝试使用 as_completed函数,它生成 Future 对象的迭代器,并且仅在一个完成时生成下一个。 as_completed 接受一个 timeout 参数,但是文档说这个超时是相对于 as_completed 被调用的第一时刻而言的,而不一定是任何Future 对象本身。

例如假设 ProcessPoolExecutor 只有 3 个工作进程,但 Future 对象列表包含 10 个项目。在处理前 3 个项目时,其中 7 个项目可能处于未处理状态长达 10 分钟。此后不久,as_completed 的超时将被触发,导致失败,即使每个单独的 Future 可能已经达到了 10 分钟的限制。

请注意,适用于 as_completed 的相同限制也适用于 wait并且 wait 更难用于此用例,因为它支持的返回选项有限。

我的下一个想法是使用 timeout parameter that future.result allows并为我的 future 列表中的每个 f(Future)调用 f.result(timeout=600)。但是,没有真正以阻塞方式要求结果的方式来设置此超时。如果您迭代 future 列表并调用 f.result(...),此调用会在指定的超时时间内阻塞。

另一方面,您也不能将 f.resultas_completed 组合在一起,以一种天真但看似正确的方式,如

[f.result(timeout=600) for f in as_completed(futures_list)]

... 因为 as_completed 的迭代在 futures 完成时欺骗性地异步等待,并且只返回它们以在 之后调用 .result他们已经完成了。

鉴于此,生成 Future 列表的正确模式是什么,其中每个都有自己的超时时间,然后异步等待它们完成?

最佳答案

似乎没有办法在这种异步上下文中提供每个 Future 超时。可用的 API 函数 waitas_completed 通过支持 Future 对象的可迭代中所有任务的全局超时来采取更简单的方法,并且不要尝试测量从 Future 第一次开始处于被处理状态的时间。

我选择了一种解决方法,将我的任务列表分成一组 block ,并对每个 block 使用 as_completed。 block 大小设置为与我的 ProcessPoolExecutor 配置使用的工作人员数量相同,这样我就可以确定 as_completed 的“全局”超时由于所有任务都立即主动处理,因此 secret 地充当了每个 future 的超时。缺点是利用率较低,因为当任务提早完成时,进程池不能空闲以获取下一个 Future 任务;它必须等待整个下一批任务。对我来说这没问题,但它是我必须选择的 concurrent.futures 的重大可用性缺陷。

这是一些示例代码。假设 my_task_list 已经包含通过 functools.partial 或其他方式绑定(bind)的部分或全部必需参数的函数。您可以修改它,以便在元组或字典的单独迭代中提供参数,并根据需要传递到 submit

my_task_list = #... define your list of task functions
num_workers = #... set number of workers
my_timeout = #... define your timeout
with ProcessPoolExecutor(max_workers=num_workers) as pool:
    all_results = []
    for chunk_start in range(0, len(my_task_list), num_workers):
        chunk = my_task_list[chunk_start:chunk_start + num_workers]
        # could extract parameters to pass for this task chunk here.
        futures = [pool.submit(task) for task in chunk]
        all_results += [
            f.result() for f in as_completed(futures, timeout=my_timeout)
        ]
    return all_results

请注意,如果您选择的 num_workers 大于 ProcessPoolExecutor 可用的处理器数量,您最终会得到比给定 block 中的处理器更多的任务并返回对于 as_completed 的超时不会正确应用于每个任务的运行时的情况,可能会导致与仅使用 as_completedwait 在没有分块的整体任务列表上。

关于python - 如何确保 concurrent.futures 迭代器中每个 Future 的超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60951208/

相关文章:

node.js 异步库

python - 按函数对python列表进行排序

javascript - 如何使用异步函数作为 Array.filter() 的比较函数?

python - 为什么 "import"是一个语句,而 "reload"是一个函数?

python - ArgumentParser.add_argument : How to use action parameter with custom type parameter?

python - Tensorflow 安装和导入正确,但在尝试使用时抛出异常

python-3.x - df.mean()/jupyter/pandas 交替轴输出

javascript - Node js async.series 不适用于 Express 应用程序 --- 响应发生得太快

python - 将函数应用于数据框中的每个观察

python - 使用 Python 查找 IPv4 并忽略 IPv6 IP 地址