python - 在多处理中使用队列返回值的替代方法

标签 python multithreading numpy

在 python 多处理中,是否有类似 Process.return_value() 的东西? 为什么不呢?有什么替代方案? (除了使用队列)

背景: 目前,我的代码包含许多这样的函数 f,它们对大型数据序列进行操作,这些数据序列是 numpy 数组 (a, b)

b= f(a)

def f(a):
    # do some work here
    return b

我可以像这样并行化每个函数:

plist=[]
for i in range(ncores):
    p= Process(target=f, args=(a[i::ncores]))
    p.start()
    plist.append(p)

这将这部分代码的速度提高了 ncore 倍。问题是获取返回值:

for i, p in enumerate(plist):
    p.join()
    b[i::ncores]= p.return_value()

我找不到通过p检索返回值的方法。我不明白为什么这种方式不可能,因为 p 可以与核心上的进程通信。

现在,我正在使用队列来获取返回值,但我发现它们使用起来有点麻烦:我必须将队列和标识符传递给每个函数,并将结果和 ID 放入队列:

def f(a, Queue=Queue, ID=-1):
    # do some work here
    if ID==-1:
        # normal interface
        return b
    else:
        # parallel interface
        Queue.put([ID, b])

然后,当我读出队列时,我必须将 ID 与原始数组进行匹配:

for i in range(ncores):
    ID, result= Queue.get()
    b[ID::ncores]= result
    plist[ID].join()

是否有更短的方法来实现这一目标?

我尝试传递字典或列表作为关键字来存储结果,这适用于线程,但不适用于多处理。

最佳答案

返回值是多少?

如果它是简单的东西,例如数组或数字,您可以将共享内存容器传递给 Process 实例,在其中存储其结果。如果我们想象您只想对多个进程中的数组求和:

from multiprocessing import Process,Value
import ctypes as C
import numpy as np

def func(input,output):
    # do something with input                                                         
    # put the result in output                                                        
    output.value = input.sum()

def launch_processes(n):
    outputs = []
    jobs = []
    for ii in range(n):
        input = np.arange(ii)

        # here use a synchronised wrapper to store the result of the func run
        # This can be replaced with an Array instance if an array is returned
        output = Value(C.c_size_t) 

        job = Process(target=func,args=(input,output))
        outputs.append(output)
        jobs.append(job)
        job.start()

    for job in jobs:
        job.join()

    for output in outputs:
        print output.value

launch_processes(10)

执行上述操作的一种更简洁的方法是通过子类化 Process 将作业封装在单个对象中:

from multiprocessing import Process,Value
import ctypes as C
import numpy as np

class Job(Process):
    def __init__(self,input):
        super(Job,self).__init__()
        self.retval = Value(C.c_size_t)
        self.input = input

    def run(self):
        self.retval.value = self.input.sum()

def launch_processes(n):
    jobs = []
    for ii in range(n):
        job = Job(np.arange(ii))
        job.start()
        jobs.append(job)

    for job in jobs:
        job.join()
        print job.retval.value

launch_processes(10)

关于python - 在多处理中使用队列返回值的替代方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23038067/

相关文章:

python - 将原始数据插入到python中随机生成的数据中

python - git 克隆 + 在 Python 中安装

python - 使用列表理解查找 kwargs 中也包含某个子字符串的最大 int 值

python - 将零填充字节转换为 UTF-8 字符串

java - 使用多线程保存多个 gzip 文件

python - 从数据框中的文本列中选择唯一的组合

python - 为什么 Python 有最大递归深度?

asp.net - 在新线程中使用 Ninject

java - publishOn 和并行的区别

python - 使用 NumPy.arange 生成包括右端在内的等距值