python - 向通过 pool.map 调用的函数添加状态——如何避免酸洗错误

标签 python multithreading parallel-processing multiprocessing

在使用多处理模块时,我遇到了一个常见的问题,即 pickle 错误。
我的确切问题是我需要在调用 pool.map 之前给出我正在调用的函数一些状态。函数,但这样做会导致 attribute lookup __builtin__.function failed发现错误 here .
根据链接的 SO 答案,它似乎是在 pool.map 中使用函数的唯一方法。就是调用定义的函数本身,以便在当前函数的作用域之外进行查找。
我觉得我对上面的解释很差,所以这是代码中的问题。 :)
无池测试

# Function to be called by the multiprocessing pool
def my_func(x):
    massive_list, medium_list, index1, index2 = x
    result = [massive_list[index1 + x][index2:] for x in xrange(10)]
    return result in medium_list



if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = map(my_func, to_crunch)
这工作正常,正如预期的那样。它唯一的“错误”是它很慢。
池尝试 1
# (Note: my_func() remains the same)
if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    pool = multiprocessing.Pool(2)
    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = pool.map(my_func, to_crunch)
这在技术上是可行的,但速度慢了惊人的 18 倍!速度变慢不仅来自于在每次调用中复制两个海量数据结构,而且还来自于在它们传递时对它们进行腌制/解封。非池版本受益于只需将引用传递给大量列表,而不是实际列表。
因此,在找到瓶颈后,我尝试将两个大型列表作为状态存储在 my_func 中。 .这样,如果我理解正确,每个 worker 只需要复制一次(在我的例子中,4)。
池尝试 2:
我结束了my_func在将两个列表作为存储状态传递的闭包中。
def build_myfunc(m,s):
    def my_func(x):
        massive_list = m # close the state in there
        small_list = s

        index1, index2 = x
        result = [massive_list[index1 + x][index2:] for x in xrange(10)]
        return result in medium_list
    return my_func

if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    modified_func = build_myfunc(data, source)

    pool = multiprocessing.Pool(2)
    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = pool.map(modified_func, to_crunch)
但是,这会返回 pickle 错误,因为(基于上面链接的 SO 问题)您不能从同一范围内调用具有多处理功能的函数。
错误:
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
那么,有没有办法解决这个问题?

最佳答案

Map 是一种分配工作负载的方法。如果您将数据存储在 func 中,我认为您将失去最初的目的。

让我们试着找到为什么它更慢。这不正常,肯定有别的原因。

首先,进程的数量必须适合运行它们的机器。在您的示例中,您使用了一个包含 2 个进程的池,因此总共涉及 3 个进程。您正在使用的系统上有多少个内核?还有什么在运行?处理数据时的系统负载是多少?
该函数对数据有什么作用?它访问磁盘吗?或者它可能使用 DB,这意味着可能有另一个进程访问磁盘和内核。
内存呢?存储初始列表是否足够?

正确的实现是您的尝试 1。

尝试使用 iostat 分析执行情况例如。这样您就可以发现瓶颈。

如果它在 cpu 上停止,那么您可以尝试对代码进行一些调整。

来自 another answer在 Stackoverflow 上(由我所以没问题复制并粘贴在这里:P):

您正在使用 .map()它收集结果然后返回。因此,对于大型数据集,您可能会陷入收集阶段。

您可以尝试使用.imap()这是 .map() 上的迭代器版本甚至 .imap_unordered() 如果结果的顺序不重要 (从你的例子看来)。

Here的相关文档。值得注意的是:

For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

关于python - 向通过 pool.map 调用的函数添加状态——如何避免酸洗错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19310536/

相关文章:

c++ - Visual C++ 只有一个线程在工作 (OpenMP)

python - 如何在 Slurm 集群的多个节点上运行 MPI Python 脚本?错误 : Warning: can't run 1 processes on 2 nodes, 将 nnodes 设置为 1

python - 为什么Python中使用多线程没有输出?

python:将行与公共(public)字段合并

python - 带有管道和 GridSearchCV 的 StandardScaler

python - 在 Python 中实例化 "type"

java - 我如何知道提交给执行器的任务是否抛出异常?

linux - 多线程程序的 GDB 调试输出

java - 对并行流进行排序时遇到顺序错误

python - 从 dev : static files not resolving 中的子文件夹提供 django