python - 共享字典似乎在 Python 多重处理中不起作用

标签 python multiprocessing python-multiprocessing

我试图通过使用带有共享dictmultipleprocessing来计算词频。我为一些初始测试编写了一个简单的 Python 代码片段:

from multiprocessing import Manager, Pool


def foo(num):
    try:
        d[num] += 1
    except KeyError:
        d[num] = 1


d = Manager().dict()
pool = Pool(processes=2, maxtasksperchild=100)
tasks = [1] * 1001 + [2] * 2000 + [3] * 1300
pool.map_async(foo, tasks)
pool.close()
pool.join()

print len(tasks)
print d

但是,d 中的频率总数与 tasks 中的频率总数不匹配。在我看来, d 没有很好地同步,但我不知道为什么会发生这种情况以及如何解决这个问题。有人可以在这里为我提供一些帮助吗?

最佳答案

这里有一个竞争条件:

try:
    d[num] += 1
except KeyError:
    d[num] = 1

假设任务 1 尝试执行 d[1] += 1 ,但是d[1]是空的,所以它得到 KeyError 。现在,另一个核心上的任务 2 尝试执行 d[1] += 1 的操作。 ,但是d[1]仍然是空的,所以它也得到 KeyError 。所以,现在任务 1 和任务 2 都会尝试设置 d[1] = 1 ,他们都会成功,所以 d[1]现在是1 ,并且您损失了 1 个增量。

更糟糕的是,假设在任务 1 开始设置 d[1] = 1 之前,任务 3-10 全部在另一个核心上运行并完成递增 d[1]一直到9 。然后任务1进来并将其设置回1 ,并且您丢失了 9 个增量。

<小时/>

您可能认为只需预初始化 d = {1: 0, 2: 0, 3: 0} 即可解决此问题并省略 try/except 。但这仍然行不通。因为即使d[1] += 1不是原子的。 Python 将其有效地编译为三个独立的操作: tmp = d.__getitem__(1) , tmp += 1 , d.__setitem__(1, tmp) .

因此,任务 1 可以从共享字典中获取现有的 0,将其递增到 1,同时任务 2 已获取现有的 0,将其递增到 1,现在它们都去存储 1并且都成功了。而且,您可以再次看到这如何扩展到丢失大批量的增量而不是仅丢失一个。

<小时/>

对共享数据的任何非原子操作都必须显式同步。 Synchronization between processes对此进行了解释。和 Sharing state between processes在文档中。

<小时/>

这里最简单的修复(尽管显然不是最好的,因为它最终会序列化您的所有访问)是:

from multiprocessing import Manager, Pool, Lock

lock = Lock()

def foo(num):
    with lock:
        try:
            d[num] += 1
        except KeyError:
            d[num] = 1
<小时/>

如果您想变得更有趣并提高效率,您将必须了解共享内存线程和同步的原理;在一个 StackOverflow 答案中需要解释的内容太多了。

关于python - 共享字典似乎在 Python 多重处理中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29996837/

相关文章:

python - 如何 reshape 图像的尺寸以包含图像数量(即 1)?

javascript - Django-CKEditor 小部件未动态加载

python - 用于计算特征值的多处理

python - 多处理组应用python

python - 如何使用 spaCy (nlp.pipe) 进行预处理来解决大型数据集性能缓慢的问题

python - ubuntu中存储的日期和时间在哪里

python - DjangoForeignKey中的表单控件

python - 在 python 中使用多重处理读取多个大型 csv 文件的最佳策略?

websocket - 如何使用非阻塞 fastapi websocket 进行多处理

algorithm - 具有作业排除和优先约束的多处理器调度