我正在使用 Python 的 multiprocessing
创建并行应用程序。进程需要共享一些数据,为此我使用了一个Manager
。但是,我有一些进程需要调用的常用函数,以及需要访问 Manager
对象存储的数据的函数。我的问题是我是否可以避免需要将 Manager
实例作为参数传递给这些常用函数,而是像全局函数一样使用它。换句话说,请考虑以下代码:
import multiprocessing as mp
manager = mp.Manager()
global_dict = manager.dict(a=[0])
def add():
global_dict['a'] += [global_dict['a'][-1]+1]
def foo_parallel(var):
add()
print var
num_processes = 5
p = []
for i in range(num_processes):
p.append(mp.Process(target=foo_parallel,args=(global_dict,)))
[pi.start() for pi in p]
[pi.join() for pi in p]
这运行良好并在我的机器上返回 p=[0,1,2,3,4,5]
。然而,这是“好形式”吗?这是一个很好的方法吗,就像定义 add(var)
并调用 add(var)
一样好?
最佳答案
您的代码示例似乎有比形式更大的问题。只有运气好,您才能获得所需的输出。重复执行会产生不同的结果。那是因为 +=
不是原子操作。多个进程可以一个接一个地读取相同的旧值,在它们中的任何一个更新它之前,它们将写回相同的值。为防止这种行为,您必须另外使用 Manager.Lock
。
关于您最初关于“良好状态”的问题。
IMO 让子进程的主函数 foo_parallel
明确地将 global_dict
传递给通用函数 add(var)
。那将是 dependency injection 的一种形式并且有一些优势。在您的示例中,并非详尽无遗:
allows isolated testing
increases code reusability
easier debugging (detecting non-accessibility of the managed object shouldn't be delayed until
add
is called (fail fast)less boilerplate code (for example try-excepts blocks on resources multiple functions need)
作为旁注。仅针对它的副作用使用列表理解被认为是“代码味道”。如果您不需要列表作为结果,只需使用 for 循环即可。
代码:
import os
from multiprocessing import Process, Manager
def add(l):
l += [l[-1] + 1]
return l
def foo_parallel(global_dict, lock):
with lock:
l = global_dict['a']
global_dict['a'] = add(l)
print(os.getpid(), global_dict)
if __name__ == '__main__':
N_WORKERS = 5
with Manager() as manager:
lock = manager.Lock()
global_dict = manager.dict(a=[0])
pool = [Process(target=foo_parallel, args=(global_dict, lock))
for _ in range(N_WORKERS)]
for p in pool:
p.start()
for p in pool:
p.join()
print('result', global_dict)
关于Python 多处理和管理器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52435589/