python - 如何在类中使用多处理管理器

标签 python multiprocessing

首先,这是一些有效的代码

from multiprocessing import Pool, Manager
import random

manager = Manager()
dct = manager.dict()

def do_thing(n):
    for i in range(10_000_000):
        i += 1
    dct[n] = random.randint(0, 9)

with Pool(2) as pool:
    pool.map(do_thing, range(10))

现在,如果我尝试用它来创建一个类:

from multiprocessing import Pool, Manager
import random


class SomeClass:
    def __init__(self):
        self.manager = Manager()
        self.dct = self.manager.dict()

    def __call__(self):
        with Pool(2) as pool:
            pool.map(self.do_thing, range(10))

    def do_thing(self, n):
        for i in range(10_000_000):
            i += 1
        self.dct[n] = random.randint(0, 9)


if __name__ == '__main__':
    inst = SomeClass()
    inst()

我遇到了:TypeError:出于安全原因,不允许 Pickling 一个 AuthenticationString 对象。现在来自here ,我得到提示,Python 正在尝试 pickle Manager,据我所知,它有自己的专用进程,并且进程不能被 pickle,因为它们包含 AuthenticationString

我不太了解 forking 是如何工作的(我在 Linux 上,所以我知道这是启动新进程的默认方法)以确切理解为什么 Manager 实例需要腌制。

所以这是我的问题:

  1. 为什么会这样?
  2. 在类中进行多处理时如何使用Manager? PS:我希望能够从此模块导入 SomeClass。
  3. 我的要求是否不合理或不合常规?

PS:我知道我可以在没有 Manager 的情况下通过利用 pool.map 将按顺序返回内容这一事实来完成这个确切的片段,所以像这样:res = pool.map(self.do_thing, range(10)) 然后 dct = {k: v for k, v in zip(range(10), res)} .但这不是问题的重点。

最佳答案

回答您的问题:

Q1 - 为什么会这样?

Pool.map()创建的每个工作进程都需要执行实例方法self.do_thing()。为了做到这一点,Python 对实例进行 pickle 并将其传递给子进程(对它进行 unpickles)。如果每个实例都有一个 Manager 这将是一个问题,因为它们不可 pickleable。 unpickling 过程的一部分涉及导入定义类的模块和恢复实例的属性(也被 pickle)。

Q2 - 如何解决

您可以通过让类创建自己的类级 Manager(由类的所有实例共享)来避免该问题。这里的 __init__() 方法在第一次创建实例时创建了 manager 类属性,从那时起,更多的实例将重用它——它有时被称为“延迟初始化” "

from multiprocessing import Pool, Manager
import random


class SomeClass:
    def __init__(self):
        # Lazy creation of class attribute.
        try:
            manager = getattr(type(self), 'manager')
        except AttributeError:
            manager = type(self).manager = Manager()
        self.dct = manager.dict()

    def __call__(self):
        with Pool(2) as pool:
            pool.map(self.do_thing, range(10))
        print('done')

    def do_thing(self, n):
        for i in range(10_000_000):
            i += 1
        self.dct[n] = random.randint(0, 9)


if __name__ == '__main__':
    inst = SomeClass()
    inst()

Q3 - 这样做合理吗?

在我看来,是的。

关于python - 如何在类中使用多处理管理器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66790158/

相关文章:

python - 使用literal_eval 将字符串转换为字典

python - 如何提高pandas中mysql查询的处理速度

与 fork 过程混淆?

python - 如何加速 pandas 数据帧迭代

python - *为什么* multiprocessing 序列化我的函数和闭包?

python - 是否可以在Python中复制一个管道,使其具有一个写入端和两个读取端?

python - 在 Flask-Restful 中使用 Blueprint 有什么好处?

python - 获取给定列中使用的唯一字符列表

python - 不显示折线图范围划分以排除 python 中的非工作时间和周末

用于并行处理的 Python 多处理池