python - 多处理池 : How to call an arbitrary list of methods on a list of class objects

标签 python python-3.x dictionary multiprocessing python-multiprocessing

一个代码的清理版本包括the solution to the problem (感谢@JohanL!)可以找到 as a Gist on GitHub .


以下截取的代码 (CPython 3.[4,5,6]) 说明了我的意图(以及我的问题):

from functools import partial
import multiprocessing
from pprint import pprint as pp

NUM_CORES = multiprocessing.cpu_count()

class some_class:
    some_dict = {'some_key': None, 'some_other_key': None}
    def some_routine(self):
        self.some_dict.update({'some_key': 'some_value'})
    def some_other_routine(self):
        self.some_dict.update({'some_other_key': 77})

def run_routines_on_objects_in_parallel_and_return(in_object_list, routine_list):
    func_handle = partial(__run_routines_on_object_and_return__, routine_list)
    with multiprocessing.Pool(processes = NUM_CORES) as p:
        out_object_list = list(p.imap_unordered(
            func_handle,
            (in_object for in_object in in_object_list)
            ))
    return out_object_list

def __run_routines_on_object_and_return__(routine_list, in_object):
    for routine_name in routine_list:
        getattr(in_object, routine_name)()
    return in_object

object_list = [some_class() for item in range(20)]
pp([item.some_dict for item in object_list])

new_object_list = run_routines_on_objects_in_parallel_and_return(
        object_list,
        ['some_routine', 'some_other_routine']
        )
pp([item.some_dict for item in new_object_list])

verification_object_list = [
    __run_routines_on_object_and_return__(
        ['some_routine', 'some_other_routine'],
        item
        ) for item in object_list
    ]
pp([item.some_dict for item in verification_object_list])

我正在处理 some_class 类型的对象列表。 some_class 有一个属性,一个字典,名为 some_dict 和一些可以修改字典的方法(some_routinesome_other_routine)。有时,我想对列表中的所有对象调用一系列方法。因为这是计算密集型的,所以我打算将对象分布在多个 CPU 内核上(使用 multiprocessing.Poolimap_unordered - 列表顺序无关紧要)。

例程 __run_routines_on_object_and_return__ 负责调用单个对象上的方法列表。据我所知,这工作得很好。我正在使用 functools.partial 来稍微简化代码的结构 - 因此多处理池必须仅将对象列表作为输入参数来处理。

问题是……它不起作用。 imap_unordered 返回的列表中包含的对象与我输入其中的对象相同。对象中的字典看起来就像以前一样。我已经使用类似的机制直接处理字典列表而没有出现故障,所以我以某种方式怀疑修改恰好是字典的对象属性有问题。

在我的示例中,verification_object_list 包含正确的结果(尽管它是在单个进程/线程中生成的)。 new_object_listobject_list 相同,但事实并非如此。

我做错了什么?


编辑

我找到了以下 question ,它有一个实际工作和适用的 answer .我按照我在每个对象上调用方法列表的想法对其进行了一些修改,并且它有效:

import random
from multiprocessing import Pool, Manager

class Tester(object):
    def __init__(self, num=0.0, name='none'):
        self.num  = num
        self.name = name
    def modify_me(self):
        self.num += random.normalvariate(mu=0, sigma=1)
        self.name = 'pla' + str(int(self.num * 100))
    def __repr__(self):
        return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)

def init(L):
    global tests
    tests = L

def modify(i_t_nn):
    i, t, nn = i_t_nn
    for method_name in nn:
        getattr(t, method_name)()
    tests[i] = t # copy back
    return i

def main():
    num_processes = num = 10 #note: num_processes and num may differ
    manager = Manager()
    tests = manager.list([Tester(num=i) for i in range(num)])
    print(tests[:2])

    args = ((i, t, ['modify_me']) for i, t in enumerate(tests))
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
    for i in pool.imap_unordered(modify, args):
        print("done %d" % i)
    pool.close()
    pool.join()
    print(tests[:2])

if __name__ == '__main__':
    main()

现在,我更进一步,将我原来的 some_class 引入到游戏中,其中包含描述的字典属性 some_dict。它不起作用:

import random
from multiprocessing import Pool, Manager
from pprint import pformat as pf

class some_class:
    some_dict = {'some_key': None, 'some_other_key': None}
    def some_routine(self):
        self.some_dict.update({'some_key': 'some_value'})
    def some_other_routine(self):
        self.some_dict.update({'some_other_key': 77})
    def __repr__(self):
        return pf(self.some_dict)

def init(L):
    global tests
    tests = L

def modify(i_t_nn):
    i, t, nn = i_t_nn
    for method_name in nn:
        getattr(t, method_name)()
    tests[i] = t # copy back
    return i

def main():
    num_processes = num = 10 #note: num_processes and num may differ
    manager = Manager()
    tests = manager.list([some_class() for i in range(num)])
    print(tests[:2])

    args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests))
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
    for i in pool.imap_unordered(modify, args):
        print("done %d" % i)
    pool.close()
    pool.join()
    print(tests[:2])

if __name__ == '__main__':
    main()

工作和不工作之间的区别真的很小,但我还是不明白:

diff --git a/test.py b/test.py
index b12eb56..0aa6def 100644
--- a/test.py
+++ b/test.py
@@ -1,15 +1,15 @@
 import random
 from multiprocessing import Pool, Manager
+from pprint import pformat as pf

-class Tester(object):
-       def __init__(self, num=0.0, name='none'):
-               self.num  = num
-               self.name = name
-       def modify_me(self):
-               self.num += random.normalvariate(mu=0, sigma=1)
-               self.name = 'pla' + str(int(self.num * 100))
+class some_class:
+       some_dict = {'some_key': None, 'some_other_key': None}
+       def some_routine(self):
+               self.some_dict.update({'some_key': 'some_value'})
+       def some_other_routine(self):
+               self.some_dict.update({'some_other_key': 77})
        def __repr__(self):
-               return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)
+               return pf(self.some_dict)

 def init(L):
        global tests
@@ -25,10 +25,10 @@ def modify(i_t_nn):
 def main():
        num_processes = num = 10 #note: num_processes and num may differ
        manager = Manager()
-       tests = manager.list([Tester(num=i) for i in range(num)])
+       tests = manager.list([some_class() for i in range(num)])
        print(tests[:2])

-       args = ((i, t, ['modify_me']) for i, t in enumerate(tests))
+       args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests))

这里发生了什么?

最佳答案

您的问题是由两件事造成的;也就是说,您正在使用一个类变量,并且您正在不同的进程中运行您的代码。

由于不同的进程不共享内存,所有对象和参数都必须被 pickle 并从原始进程发送到执行它的进程。当参数是一个对象时,它的类与它一起发送。相反,接收进程使用自己的蓝图(即 class)。

在您当前的代码中,您将对象作为参数传递、更新并返回。但是,更新不是针对对象,而是针对类本身,因为您更新的是类变量。但是,此更新不会发送回您的主进程,因此您会留下未更新的类。

想要 做的是使some_dict 成为您的对象的一部分,而不是您的类的一部分。这可以通过 __init__() 方法轻松完成。因此将 some_class 修改为:

class some_class:
    def __init__(self):
        self.some_dict = {'some_key': None, 'some_other_key': None}
    def some_routine(self):
        self.some_dict.update({'some_key': 'some_value'})
    def some_other_routine(self):
        self.some_dict.update({'some_other_key': 77})

这将使您的程序按预期运行。你几乎总是希望在 __init__() 调用中设置你的对象,而不是作为类变量,因为在后一种情况下,数据将在所有实例之间共享(并且可以被所有实例更新)。当您将数据和状态封装在类的对象中时,这通常不是您想要的。

编辑: 似乎我弄错了 class 是否与 pickled 对象一起发送。在进一步检查发生了什么之后,我认为 class 本身及其类变量也被 pickle 了。因为,如果在将对象发送到新进程之前更新了类变量,则更新后的值可用。 但是在新进程中完成的更新仍然没有中继回原始

关于python - 多处理池 : How to call an arbitrary list of methods on a list of class objects,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46253963/

相关文章:

python - Visual Studio Code 智能感知不显示修饰函数的参数

python - 实现 group_by_owners 字典

c++ - 在键上使用 map.find() 和 count(),这是一个类对象类型

python - 有没有办法在 python 列表中切换元素?

python-3.x - Python 3.7 boolean 索引 *警告* 使用 'list'

python - 使用 NumPy 从矩阵中获取最小/最大 n 值和索引的有效方法

python - Pandas Dataframe 过滤器包含 ('.' ),Python 3.6

java - 在 Java 8 中使用 Lambda 遍历一个 Map of Maps?

python - 为什么我需要创建 `QApplication` 的对象,它在 PyQt GUI 编程中的目的是什么?

python - 使用 Python 将 JSON 转换为 CSV 问题