我试图通过使用带有共享dict
的multipleprocessing
来计算词频。我为一些初始测试编写了一个简单的 Python 代码片段:
from multiprocessing import Manager, Pool
def foo(num):
try:
d[num] += 1
except KeyError:
d[num] = 1
d = Manager().dict()
pool = Pool(processes=2, maxtasksperchild=100)
tasks = [1] * 1001 + [2] * 2000 + [3] * 1300
pool.map_async(foo, tasks)
pool.close()
pool.join()
print len(tasks)
print d
但是,d
中的频率总数与 tasks
中的频率总数不匹配。在我看来, d 没有很好地同步,但我不知道为什么会发生这种情况以及如何解决这个问题。有人可以在这里为我提供一些帮助吗?
最佳答案
这里有一个竞争条件:
try:
d[num] += 1
except KeyError:
d[num] = 1
假设任务 1 尝试执行 d[1] += 1
,但是d[1]
是空的,所以它得到 KeyError
。现在,另一个核心上的任务 2 尝试执行 d[1] += 1
的操作。 ,但是d[1]
仍然是空的,所以它也得到 KeyError
。所以,现在任务 1 和任务 2 都会尝试设置 d[1] = 1
,他们都会成功,所以 d[1]
现在是1
,并且您损失了 1 个增量。
更糟糕的是,假设在任务 1 开始设置 d[1] = 1
之前,任务 3-10 全部在另一个核心上运行并完成递增 d[1]
一直到9
。然后任务1进来并将其设置回1
,并且您丢失了 9 个增量。
您可能认为只需预初始化 d = {1: 0, 2: 0, 3: 0}
即可解决此问题并省略 try
/except
。但这仍然行不通。因为即使d[1] += 1
不是原子的。 Python 将其有效地编译为三个独立的操作: tmp = d.__getitem__(1)
, tmp += 1
, d.__setitem__(1, tmp)
.
因此,任务 1 可以从共享字典中获取现有的 0,将其递增到 1,同时任务 2 已获取现有的 0,将其递增到 1,现在它们都去存储 1
并且都成功了。而且,您可以再次看到这如何扩展到丢失大批量的增量而不是仅丢失一个。
对共享数据的任何非原子操作都必须显式同步。 Synchronization between processes对此进行了解释。和 Sharing state between processes在文档中。
<小时/>这里最简单的修复(尽管显然不是最好的,因为它最终会序列化您的所有访问)是:
from multiprocessing import Manager, Pool, Lock
lock = Lock()
def foo(num):
with lock:
try:
d[num] += 1
except KeyError:
d[num] = 1
<小时/>
如果您想变得更有趣并提高效率,您将必须了解共享内存线程和同步的原理;在一个 StackOverflow 答案中需要解释的内容太多了。
关于python - 共享字典似乎在 Python 多重处理中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29996837/