python - 在 Windows 中运行 python 多处理时内存使用率较高

标签 python performance memory multiprocessing parallelism-amdahl

下面的代码是一个人为的示例,它模拟了我遇到的实际问题,该问题使用多处理来加速代码。该代码在 Windows 10 64 位操作系统python 3.7.5ipython 7.9.0

上运行 <小时/>

转换函数(这些函数将用于在 main() 中转换数组)

from itertools import product
from functools import partial

from numba import njit, prange
import multiprocessing as mp
import numpy as np

@njit(parallel= True)
def transform_array_c(data, n):

    ar_len= len(data)

    sec_max1= np.empty(ar_len, dtype = data.dtype)
    sec_max2= np.empty(ar_len, dtype = data.dtype)

    for i in prange(n-1):
        sec_max1[i]= np.nan

    for sec in prange(ar_len//n):
        s2_max= data[n*sec+ n-1]
        s1_max= data[n*sec+ n]

        for i in range(n-1,-1,-1):
            if data[n*sec+i] > s2_max:
                s2_max= data[n*sec+i]
            sec_max2[n*sec+i]= s2_max

        sec_max1[n*sec+ n-1]= sec_max2[n*sec]

        for i in range(n-1):
            if n*sec+n+i < ar_len:
                if data[n*sec+n+i] > s1_max:
                    s1_max= data[n*sec+n+i]
                sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1])

            else:
                break

    return sec_max1  

@njit(error_model= 'numpy', cache= True)
def rt_mean_sq_dev(array1, array2, n):
    msd_temp = np.empty(array1.shape[0])

    K = array2[n-1]

    rs_x= array1[0] - K
    rs_xsq = rs_x *rs_x

    msd_temp[0] = np.nan

    for i in range(1,n):
        rs_x += array1[i] - K
        rs_xsq += np.square(array1[i] - K)
        msd_temp[i] = np.nan

    y_i = array2[n-1] - K
    msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    for i in range(n, array1.shape[0]):
        rs_x = array1[i] - array1[i-n]+ rs_x
        rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq
        y_i = array2[i] - K

        msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    return msd_temp 

@njit(cache= True)
def transform_array_a(data, n):
    result = np.empty(data.shape[0], dtype= data.dtype)
    alpharev = 1. - 2 / (n + 1)
    alpharev_exp = alpharev

    e = data[0]
    w = 1.

    if n == 2: result[0] = e
    else:result[0] = np.nan

    for i in range(1, data.shape[0]):
        w += alpharev_exp
        e = e*alpharev + data[i]

        if i > n -3:result[i] = e / w
        else:result[i] = np.nan

        if alpharev_exp > 3e-307:alpharev_exp*= alpharev
        else:alpharev_exp=0.

    return result
<小时/>

多处理部分

def func(tup, data):    #<-------------the function to be run among all 
    a_temp= a[tup[2][0]]

    idx1 = a_temp > a[tup[2][1]]
    idx2= a_temp < b[(tup[2][1], tup[1][1])]

    c_final = c[tup[0][1]][idx1 | idx2]
    data_final= data[idx1 | idx2]

    return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1]

def setup(a_dict, b_dict, c_dict):    #initialize the shared dictionaries
    global a,b,c
    a,b,c = a_dict, b_dict, c_dict

def main(a_arr, b_arr, c_arr, common_len):
    np.random.seed(0)
    data_array= np.random.normal(loc= 24004, scale=500, size= common_len)

    a_size = a_arr[-1] + 1
    b_size = len(b_arr)
    c_size = len(c_arr)

    loop_combo = product(enumerate(c_arr),
                         enumerate(b_arr),
                         (n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0])
                         )
    result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32) 

    ###################################################
    #This part simulates the heavy-computation in the actual problem

    a= {}
    b= {}
    c= {}

    for i in range(1, a_arr[-1]+1):

        a[i]= transform_array_a(data_array, i)
        if i in a_arr:
            for j in b_arr:
                b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j


    for i in c_arr:
        c[i]= transform_array_c(data_array, i)

    ###################################################    
    with mp.Pool(processes= mp.cpu_count() - 1,
                 initializer= setup,
                 initargs= [a,b,c]
                 ) as pool:
        mp_res= pool.imap_unordered(partial(func, data= data_array),
                                    loop_combo
                                    )

        for item in mp_res:
            result[item[0]] =item[1]


    return result


if __name__ == '__main__':
    mp.freeze_support()

    a_arr= np.arange(2,44,2)
    b_arr= np.arange(0.4,0.8, 0.20)
    c_arr= np.arange(2,42,10)
    common_len= 440000

    final_res= main(a_arr, b_arr, c_arr, common_len)

出于性能考虑,所有进程之间使用多个共享的“只读”字典来减少冗余计算(在实际问题中,所有进程之间使用共享字典后总计算时间减少了40%)。然而,在我的实际问题中使用共享字典后,内存使用率变得异常高;我的 6C/12T Windows 计算机的内存使用量从(8.2GB 峰值,5.0GB 空闲)变为(23.9GB 峰值,5.0GB 空闲),为了获得 40% 的速度提升,付出的成本有点太高了。

当必须在进程之间使用多个共享数据时,高内存使用率是否不可避免?可以对我的代码进行哪些操作,以便在使用尽可能少的内存的同时使其尽可能快?

提前谢谢

<小时/>

注意:我尝试使用 imap_unordered() 而不是 map 因为我听说它应该在输入可迭代较大时减少内存使用量,但老实说我可以内存使用率没有改善。也许我在这里做错了什么?

<小时/>

编辑:由于答案中的反馈,我已经更改了代码的繁重计算部分,使其看起来不那么虚拟并且类似于实际问题中的计算。

最佳答案

High Memory Usage when manipulating shared dictionaries in python multiprocessing run in Windows

在我们深入细节之前,有必要先揭开这个问题的神秘面纱 - 原始代码中没有共享字典,他们获得的越少被操纵(是的,每个 a,b,c 确实被“分配”到对 dict_a, dict_b, dict_c 的引用,但它们都没有共享 ,但只是像 Windows 级 O/S-es 中的 multiprocessing 那样进行复制。不会写入“到”dict -s(只是从其任一副本进行非破坏性读取)

类似地,np.memmap() -s 可以将最初提出的数据的某些部分放入磁盘空间(这样做的代价是+承担一些(延迟屏蔽的)随机读取延迟 ~ 10 [ms] 而不是 ~ 0.5 [ns] 如果聪明的话-对齐的矢量化内存模式被设计成性能热点),但这里不应预期范式发生巨大变化,因为“外部迭代器”几乎避免了任何智能对齐缓存的重用

Q : What can be done to my code in order to make it as fast as possible while using as low memory as possible?

第一个罪过是使用 8B - int64 来存储一个普通的 Bbit(这里还没有 Qbits ~ 向 Burnaby Quantum 研发团队致敬)

for i in c_arr:                                    # ~~ np.arange( 2, 42, 10 )
     np.random.seed( i )                           # ~ yields a deterministic state
     c[i] = np.random.poisson( size = common_len ) # ~ 440.000 int64-s with {0|1}

这需要 6 (进程)x 440000 x 8B ~ 0.021 GB “走私”到字典 c 的所有副本中,而每个这样的值都是确定性已知的,并且只需知道 i 的值即可在相应的目标进程内生成 ALAP(实际上不需要预先生成并多次复制 ~ 0.021 GB 数据)

到目前为止,Windows 级操作系统 lack an os.fork() and thus do a python 的完整副本(是的,RAM ...,是的,时间)按照要求复制了许多 python 解释器 session (加上导入主模块),位于 multiprocessing用于基于进程的分离(这样做是为了避免 GIL 锁有序,纯 [SERIAL],代码执行)

<小时/>

最佳下一步:
重新考虑代码
以提高效率和性能

最好的下一步 - 重构代码,以尽量减少对 6 个进程的“浅薄”(且昂贵)使用,但由中央迭代器“外部”命令(loop_combo“独裁者” 使用约 18522 个项目重复调用“远程调度” func( tup, data ) ,以便获取一个简单的“DMA 元组”- ( (x,y,z), value ) 将一个 value 存储到中央进程 result - float32 - 数组)。

尝试增加计算“密度” - 因此尝试通过分而治之的方式重构代码(即,每个 mp.pool 进程在一个平 slider 中计算一些非常大的专用子-所覆盖的参数空间的空间(这里迭代地“从外部”)并且可以轻松地减少返回的结果 block 。这样做只会提高性能(最好没有任何形式的昂贵共享)。

此重构将避免参数 pickle/unpickle -成本(附加开销 - 一次性(传递唯一参数集值)和重复性(在大约 18522 次执行的重复内存中) - 分配、累积和 pickle/unpickle - 由于调用签名设计/工程不佳而导致 np.arange( 440000 ) 的成本)

所有这些步骤将提高您的处理效率并减少不必要的 RAM 分配。

关于python - 在 Windows 中运行 python 多处理时内存使用率较高,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58701900/

相关文章:

python - Django 中不同的用户级别

c++ - 从 C++(或 C)回调调用 python 方法

python - Eigenfaces 训练图像像素大小误差

python - 如何有效地创建一个多维 numpy 数组,其条目仅取决于一维索引?

C#:用尽几乎所有内存

Java 字符串内存泄漏

python - Tkinter 滚动条不滚动

java - 仅记录当前线程/ session 的 Hibernate SQL 的最简单方法

performance - 如何衡量网站的加载和搜索时间的响应时间? Selenium

Python计算大文件中的单词数