在 python 多处理中,是否有类似 Process.return_value() 的东西? 为什么不呢?有什么替代方案? (除了使用队列)
背景: 目前,我的代码包含许多这样的函数 f,它们对大型数据序列进行操作,这些数据序列是 numpy 数组 (a, b)
b= f(a)
def f(a):
# do some work here
return b
我可以像这样并行化每个函数:
plist=[]
for i in range(ncores):
p= Process(target=f, args=(a[i::ncores]))
p.start()
plist.append(p)
这将这部分代码的速度提高了 ncore 倍。问题是获取返回值:
for i, p in enumerate(plist):
p.join()
b[i::ncores]= p.return_value()
我找不到通过p检索返回值的方法。我不明白为什么这种方式不可能,因为 p 可以与核心上的进程通信。
现在,我正在使用队列来获取返回值,但我发现它们使用起来有点麻烦:我必须将队列和标识符传递给每个函数,并将结果和 ID 放入队列:
def f(a, Queue=Queue, ID=-1):
# do some work here
if ID==-1:
# normal interface
return b
else:
# parallel interface
Queue.put([ID, b])
然后,当我读出队列时,我必须将 ID 与原始数组进行匹配:
for i in range(ncores):
ID, result= Queue.get()
b[ID::ncores]= result
plist[ID].join()
是否有更短的方法来实现这一目标?
我尝试传递字典或列表作为关键字来存储结果,这适用于线程,但不适用于多处理。
最佳答案
返回值是多少?
如果它是简单的东西,例如数组或数字,您可以将共享内存容器传递给 Process 实例,在其中存储其结果。如果我们想象您只想对多个进程中的数组求和:
from multiprocessing import Process,Value
import ctypes as C
import numpy as np
def func(input,output):
# do something with input
# put the result in output
output.value = input.sum()
def launch_processes(n):
outputs = []
jobs = []
for ii in range(n):
input = np.arange(ii)
# here use a synchronised wrapper to store the result of the func run
# This can be replaced with an Array instance if an array is returned
output = Value(C.c_size_t)
job = Process(target=func,args=(input,output))
outputs.append(output)
jobs.append(job)
job.start()
for job in jobs:
job.join()
for output in outputs:
print output.value
launch_processes(10)
执行上述操作的一种更简洁的方法是通过子类化 Process 将作业封装在单个对象中:
from multiprocessing import Process,Value
import ctypes as C
import numpy as np
class Job(Process):
def __init__(self,input):
super(Job,self).__init__()
self.retval = Value(C.c_size_t)
self.input = input
def run(self):
self.retval.value = self.input.sum()
def launch_processes(n):
jobs = []
for ii in range(n):
job = Job(np.arange(ii))
job.start()
jobs.append(job)
for job in jobs:
job.join()
print job.retval.value
launch_processes(10)
关于python - 在多处理中使用队列返回值的替代方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23038067/