在使用多处理模块时,我遇到了一个常见的问题,即 pickle 错误。
我的确切问题是我需要在调用 pool.map
之前给出我正在调用的函数一些状态。函数,但这样做会导致 attribute lookup __builtin__.function failed
发现错误 here .
根据链接的 SO 答案,它似乎是在 pool.map
中使用函数的唯一方法。就是调用定义的函数本身,以便在当前函数的作用域之外进行查找。
我觉得我对上面的解释很差,所以这是代码中的问题。 :)
无池测试
# Function to be called by the multiprocessing pool
def my_func(x):
massive_list, medium_list, index1, index2 = x
result = [massive_list[index1 + x][index2:] for x in xrange(10)]
return result in medium_list
if __name__ == '__main__':
data = [comprehension which loads a ton of state]
source = [comprehension which also loads a medium amount of state]
for num in range(100):
to_crunch = ((massive_list, small_list, num, x) for x in range(1000))
result = map(my_func, to_crunch)
这工作正常,正如预期的那样。它唯一的“错误”是它很慢。池尝试 1
# (Note: my_func() remains the same)
if __name__ == '__main__':
data = [comprehension which loads a ton of state]
source = [comprehension which also loads a medium amount of state]
pool = multiprocessing.Pool(2)
for num in range(100):
to_crunch = ((massive_list, small_list, num, x) for x in range(1000))
result = pool.map(my_func, to_crunch)
这在技术上是可行的,但速度慢了惊人的 18 倍!速度变慢不仅来自于在每次调用中复制两个海量数据结构,而且还来自于在它们传递时对它们进行腌制/解封。非池版本受益于只需将引用传递给大量列表,而不是实际列表。因此,在找到瓶颈后,我尝试将两个大型列表作为状态存储在
my_func
中。 .这样,如果我理解正确,每个 worker 只需要复制一次(在我的例子中,4)。池尝试 2:
我结束了
my_func
在将两个列表作为存储状态传递的闭包中。def build_myfunc(m,s):
def my_func(x):
massive_list = m # close the state in there
small_list = s
index1, index2 = x
result = [massive_list[index1 + x][index2:] for x in xrange(10)]
return result in medium_list
return my_func
if __name__ == '__main__':
data = [comprehension which loads a ton of state]
source = [comprehension which also loads a medium amount of state]
modified_func = build_myfunc(data, source)
pool = multiprocessing.Pool(2)
for num in range(100):
to_crunch = ((massive_list, small_list, num, x) for x in range(1000))
result = pool.map(modified_func, to_crunch)
但是,这会返回 pickle 错误,因为(基于上面链接的 SO 问题)您不能从同一范围内调用具有多处理功能的函数。错误:
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
那么,有没有办法解决这个问题?
最佳答案
Map 是一种分配工作负载的方法。如果您将数据存储在 func 中,我认为您将失去最初的目的。
让我们试着找到为什么它更慢。这不正常,肯定有别的原因。
首先,进程的数量必须适合运行它们的机器。在您的示例中,您使用了一个包含 2 个进程的池,因此总共涉及 3 个进程。您正在使用的系统上有多少个内核?还有什么在运行?处理数据时的系统负载是多少?
该函数对数据有什么作用?它访问磁盘吗?或者它可能使用 DB,这意味着可能有另一个进程访问磁盘和内核。
内存呢?存储初始列表是否足够?
正确的实现是您的尝试 1。
尝试使用 iostat
分析执行情况例如。这样您就可以发现瓶颈。
如果它在 cpu 上停止,那么您可以尝试对代码进行一些调整。
来自 another answer在 Stackoverflow 上(由我所以没问题复制并粘贴在这里:P):
您正在使用 .map()
它收集结果然后返回。因此,对于大型数据集,您可能会陷入收集阶段。
您可以尝试使用.imap()
这是 .map()
上的迭代器版本甚至 .imap_unordered()
如果结果的顺序不重要 (从你的例子看来)。
Here的相关文档。值得注意的是:
For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
关于python - 向通过 pool.map 调用的函数添加状态——如何避免酸洗错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19310536/