我正在编写一些代码,这些代码对大量(数万到数十万个数值积分)问题进行相当繁重的数值计算。幸运的是,这些集成是并行的,因此很容易使用 Pool.map() 将工作拆分到多个内核。
现在,我有一个具有以下基本工作流程的程序:
#!/usr/bin/env python
from multiprocessing import Pool
from scipy import *
from my_parser import parse_numpy_array
from my_project import heavy_computation
#X is a global multidimensional numpy array
X = parse_numpy_array("input.dat")
param_1 = 0.0168
param_2 = 1.505
def do_work(arg):
return heavy_computation(X, param_1, param_2, arg)
if __name__=='__main__':
pool = Pool()
arglist = linspace(0.0,1.0,100)
results = Pool.map(do_work,arglist)
#save results in a .npy file for analysis
save("Results", [X,results])
由于 X、param_1 和 param_2 对于池中的每个进程都是硬编码和初始化的,因此一切正常。现在我的代码可以正常工作了,我想让它让用户在运行时输入文件名、param_1 和 param_2,而不是硬编码。
应该注意的一件事是,X、param_1 和 param_2 在工作完成时没有被修改。因为我不修改它们,所以我可以在程序开始时做这样的事情:
import sys
X = parse_numpy_array(sys.argv[1])
param_1 = float(sys.argv[2])
param_2 = float(sys.argv[3])
这样就可以了,但是由于这段代码的大多数用户都在 Windows 机器上运行代码,所以我宁愿不走命令行参数的路线。
我真正想做的是这样的:
X, param_1, param_2 = None, None, None
def init(x,p1, p2)
X = x
param_1 = p1
param_2 = p2
if __name__=='__main__':
filename = raw_input("Filename> ")
param_1 = float(raw_input("Parameter 1: "))
param_2 = float(raw_input("Parameter 2: "))
X = parse_numpy_array(filename)
pool = Pool(initializer = init, initargs = (X, param_1, param_2,))
arglist = linspace(0.0,1.0,100)
results = Pool.map(do_work,arglist)
#save results in a .npy file for analysis
save("Results", [X,results])
但是,当然,当 pool.map 调用发生时,这会失败并且 X/param_1/param_2 都为 None。我对多处理很陌生,所以我不确定为什么对初始化程序的调用失败。有没有办法做我想做的事?有没有更好的方法来解决这个问题?我也看过使用共享数据,但根据我对文档的理解,它只适用于 ctypes,不包括 numpy 数组。如有任何帮助,我们将不胜感激。
最佳答案
我遇到了类似的问题。如果您只想阅读我的解决方案,请跳过几行 :) 我必须:
- 在操作它的不同部分的线程之间共享一个 numpy.array 并且...
- 向 Pool.map 传递一个具有多个参数的函数。
我注意到:
- numpy.array 的数据被正确读取但是...
- 对 numpy.array 的更改不是永久的
- Pool.map 在处理 lambda 函数时有问题,或者在我看来是这样(如果您不清楚这一点,请忽略它)
我的解决方案是:
- 将目标函数唯一参数设为一个列表
- 让目标函数返回修改后的数据,而不是直接尝试写入numpy.array
我知道你的 do_work 函数已经返回计算数据,所以你只需要修改 to_work 接受一个列表(包含 X、param_1、param_2 和 arg)作为参数,并将输入打包到目标函数中在将其传递给 Pool.map 之前格式化。
这是一个示例实现:
def do_work2(args):
X,param_1,param_2,arg = args
return heavy_computation(X, param_1, param_2, arg)
现在您必须在调用 do_work 函数之前将输入打包。你的主要成为:
if __name__=='__main__':
filename = raw_input("Filename> ")
param_1 = float(raw_input("Parameter 1: "))
param_2 = float(raw_input("Parameter 2: "))
X = parse_numpy_array(filename)
# now you pack the input arguments
arglist = [[X,param1,param2,n] for n in linspace(0.0,1.0,100)]
# consider that you're not making 100 copies of X here. You're just passing a reference to it
results = Pool.map(do_work2,arglist)
#save results in a .npy file for analysis
save("Results", [X,results])
关于python - 在 python 多处理池中共享 numpy 数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11963148/