我正在尝试实现一种高度可并行化的在线递归并行算法。我的问题是我的 python 实现无法正常工作。我有两个二维矩阵,每次在时间步长 t 观察到新观察时,我想递归更新每一列。 我的并行代码是这样的
def apply_async(t):
worker = mp.Pool(processes = 4)
for i in range(4):
X[:,i,np.newaxis], b[:,i,np.newaxis] = worker.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis])).get()
worker.close()
worker.join()
for t in range(p,T):
count = 0
for l in range(p):
for k in range(4):
gn[count]=train[t-l-1,k]
count+=1
G = G*v + gn @ gn.T
Gt = (1/(t-p+1))*G
if __name__ == '__main__':
apply_async(t)
这两个矩阵是X和b。我想直接替换 master 的内存,因为每个进程只递归更新矩阵的一个特定列。
为什么这个实现比顺序执行慢?
有没有办法在每个时间步恢复进程而不是杀死它们并重新创建它们?这可能是它变慢的原因吗?
最佳答案
原因是,您的程序实际上是顺序的。这是一个示例代码片段,从并行性的角度来看与您的相同:
from multiprocessing import Pool
from time import sleep
def gwork( qq):
print (qq)
sleep(1)
return 42
p = Pool(processes=4)
for q in range(1, 10):
p.apply_async(gwork, args=(q,)).get()
p.close()
p.join()
运行此程序,您会注意到数字 1-9 每秒恰好出现一次。为什么是这样?原因是您的 .get()
。这意味着对 apply_async 的每次调用实际上都会在 get()
中阻塞,直到结果可用。它将提交一个任务,等待一秒钟模拟处理延迟,然后返回结果,然后将另一个任务提交到您的池中。这意味着根本没有正在进行的并行执行。
尝试用这个替换池管理部分:
results = []
for q in range(1, 10):
res = p.apply_async(gwork, args=(q,))
results.append(res)
p.close()
p.join()
for r in results:
print (r.get())
您现在可以看到并行性在起作用,因为您的四个任务现在是同时处理的。您的循环不会阻塞在 get 中,因为 get 已移出循环并且只有在准备就绪时才会收到结果。
注意:如果您对 worker 的参数或它们的返回值是大型数据结构,您将损失一些性能。在实践中,Python 将这些实现为队列,与子进程 fork 时获取数据结构的内存副本相比,通过队列传输大量数据相对来说要慢一些。
关于python - 为什么我的并行代码比顺序代码慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49174419/