我正在尝试使用 ProcessPoolExecutor 方法,但它失败了。 这是一个失败使用的例子(计算两个数字的大除法器)。 我不明白这是什么错误
def gcd(pair):
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % i == 0 and b % i == 0:
return i
numbers = [(1963309, 2265973), (2030677, 3814172),
(1551645, 2229620), (2039045, 2020802)]
start = time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))
BrokenProcessPool:进程池中的进程在未来运行或挂起时突然终止。
最佳答案
将您的代码更改为如下所示,它将起作用:
from time import time
from concurrent.futures import ProcessPoolExecutor
def gcd(pair):
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % i == 0 and b % i == 0:
return i
numbers = [(1963309, 2265973), (2030677, 3814172),
(1551645, 2229620), (2039045, 2020802)]
def main():
start = time()
pool = ProcessPoolExecutor(max_workers=3)
results = list(pool.map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))
if __name__ == '__main__':
main()
在支持 fork()
的系统上,这不是必需的,因为您的脚本只导入一次,然后每个启动的进程 ProcessPoolExecutor
都已经有一份全局命名空间中的对象,例如 gcd
函数。一旦它们被 fork ,它们就会经历一个引导过程,由此它们开始运行它们的目标函数(在这种情况下是一个工作进程循环,它接受来自进程池执行器的作业)并且它们从不返回到原始状态它们从中派生出来的主模块中的代码。
相比之下,如果您使用的是基于 spawn
的进程,这是 Windows 和 OSX 上的默认进程,则必须为每个工作进程从头开始启动一个新进程,如果他们必须重新导入您的模块。但是,如果您的模块直接在模块主体中执行类似 ProcessPoolExecutor
的操作,而不像 if __name__ == '__main__':
那样对其进行保护,那么它们就无法导入您的模块而不启动新的 ProcessPoolExecutor
。因此,您遇到的这个错误本质上是在防止您创建无限进程炸弹。
ProcessPoolExecutor
的文档中提到了这一点:
The
__main__
module must be importable by worker subprocesses. This means thatProcessPoolExecutor
will not work in the interactive interpreter.
但是他们并没有真正弄清楚为什么会这样,或者 __main__
模块“可导入”意味着什么。当您用 Python 编写一个简单的脚本并像 python foo.py
一样运行时,您的脚本 foo.py
会加载一个模块名称 __main__
,而不是一个名为 foo
的模块,如果你 import foo
就会得到这个模块。在这种情况下,它是“可导入的”实际上意味着可导入而没有产生新进程等主要副作用。
关于Python 并发.futures : ProcessPoolExecutor fail to work,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59124525/