只是尝试和学习,我知道如何创建一个可以通过多个进程访问的共享字典,但我不确定如何保持字典同步。 defaultdict
,我相信,说明了我遇到的问题。
from collections import defaultdict
from multiprocessing import Pool, Manager, Process
#test without multiprocessing
s = 'mississippi'
d = defaultdict(int)
for k in s:
d[k] += 1
print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)]
print '*'*10, ' with multiprocessing ', '*'*10
def test(k, multi_dict):
multi_dict[k] += 1
if __name__ == '__main__':
pool = Pool(processes=4)
mgr = Manager()
multi_d = mgr.dict()
for k in s:
pool.apply_async(test, (k, multi_d))
# Mark pool as closed -- no more tasks can be added.
pool.close()
# Wait for tasks to exit
pool.join()
# Output results
print multi_d.items() #FAIL
print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10
def test2(k, multi_dict2):
multi_dict2[k] += 1
if __name__ == '__main__':
manager = Manager()
multi_d2 = manager.dict()
for k in s:
p = Process(target=test2, args=(k, multi_d2))
p.start()
p.join()
print multi_d2 #FAIL
第一个结果有效(因为它没有使用 multiprocessing
),但我在使用 multiprocessing
时遇到了问题。我不确定如何解决它,但我认为可能是因为它没有被同步(并且稍后加入结果)或者可能是因为在 multiprocessing
中我不知道如何设置 defaultdict (int)
到字典。
任何关于如何让它工作的帮助或建议都会很棒!
最佳答案
您可以子类化 BaseManager
并注册其他类型以进行共享。在默认 AutoProxy
生成的类型不起作用的情况下,您需要提供合适的代理类型。对于defaultdict
,如果您只需要访问dict
中已经存在的属性,您可以使用DictProxy
。
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict
class MyManager(BaseManager):
pass
MyManager.register('defaultdict', defaultdict, DictProxy)
def test(k, multi_dict):
multi_dict[k] += 1
if __name__ == '__main__':
pool = Pool(processes=4)
mgr = MyManager()
mgr.start()
multi_d = mgr.defaultdict(int)
for k in 'mississippi':
pool.apply_async(test, (k, multi_d))
pool.close()
pool.join()
print multi_d.items()
关于python - 将 defaultdict 与多处理一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9256687/