我已经为一些较大对象的问题实现了多处理,如下所示:
import time
import pathos.multiprocessing as mp
from functools import partial
from random import randrange
class RandomNumber():
def __init__(self, object_size=100):
self.size = bytearray(object_size*10**6) # 100 MB size
self.foo = None
def do_something(self, *args, **kwargs):
self.foo = randrange(1, 10)
time.sleep(0.5) # wait for 0.5 seconds
return self
def wrapper(random_number, *args, **kwargs):
return random_number.do_something(*args, **kwargs)
if __name__ == '__main__':
# create data
numbers = [RandomNumber() for m in range(0, 9)]
kwds = {'add': randrange(1, 10)}
# calculate
pool = mp.Pool(processes=mp.cpu_count())
result = pool.map_async(partial(wrapper, **kwds), numbers)
try:
result = result.get()
except:
pass
# print result
my_results = [i.foo for i in result]
print(my_results)
pool.close()
pool.join()
产生类似的东西:
[8, 7, 8, 3, 1, 2, 6, 4, 8]
现在的问题是,与在对象非常小的情况下使用列表理解相比,我在性能上有了巨大的改进,而这种改进在对象尺寸较大时变得相反,例如100 MB 或更大。
来自documentation和其他问题我发现这是由于使用 pickle/dill 序列化单个对象以便将它们传递给池中的工作人员引起的。换句话说:对象被复制,这个 IO 操作成为瓶颈,因为它比实际计算更耗时。
我已经尝试使用 multiprocessing.Manager 处理同一个对象但这导致了更高的运行时间。
问题是我被绑定(bind)到我无法更改的特定类结构(此处通过 RandomNumber()
表示)..
现在我的问题是:是否有任何方法或概念可以规避此行为并且只在 do_something()
上调用我而无需序列化或复制的开销?
欢迎任何提示。提前致谢!
最佳答案
您需要使用批处理
。不要为每个数字创建 destroy worker。
根据cpu_count
制作有限的 worker 。然后将一个列表传递给每个工作人员并处理它们。使用map
并传递一个包含数字批处理
的列表。
关于具有大对象的 Python 多处理 : prevent copying/serialization of object,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59730855/