下面的代码是一个人为的示例,它模拟了我遇到的实际问题,该问题使用多处理来加速代码。该代码在 Windows 10 64 位操作系统
、python 3.7.5
和 ipython 7.9.0
转换函数(这些函数将用于在 main()
中转换数组)
from itertools import product
from functools import partial
from numba import njit, prange
import multiprocessing as mp
import numpy as np
@njit(parallel= True)
def transform_array_c(data, n):
ar_len= len(data)
sec_max1= np.empty(ar_len, dtype = data.dtype)
sec_max2= np.empty(ar_len, dtype = data.dtype)
for i in prange(n-1):
sec_max1[i]= np.nan
for sec in prange(ar_len//n):
s2_max= data[n*sec+ n-1]
s1_max= data[n*sec+ n]
for i in range(n-1,-1,-1):
if data[n*sec+i] > s2_max:
s2_max= data[n*sec+i]
sec_max2[n*sec+i]= s2_max
sec_max1[n*sec+ n-1]= sec_max2[n*sec]
for i in range(n-1):
if n*sec+n+i < ar_len:
if data[n*sec+n+i] > s1_max:
s1_max= data[n*sec+n+i]
sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1])
else:
break
return sec_max1
@njit(error_model= 'numpy', cache= True)
def rt_mean_sq_dev(array1, array2, n):
msd_temp = np.empty(array1.shape[0])
K = array2[n-1]
rs_x= array1[0] - K
rs_xsq = rs_x *rs_x
msd_temp[0] = np.nan
for i in range(1,n):
rs_x += array1[i] - K
rs_xsq += np.square(array1[i] - K)
msd_temp[i] = np.nan
y_i = array2[n-1] - K
msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))
for i in range(n, array1.shape[0]):
rs_x = array1[i] - array1[i-n]+ rs_x
rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq
y_i = array2[i] - K
msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))
return msd_temp
@njit(cache= True)
def transform_array_a(data, n):
result = np.empty(data.shape[0], dtype= data.dtype)
alpharev = 1. - 2 / (n + 1)
alpharev_exp = alpharev
e = data[0]
w = 1.
if n == 2: result[0] = e
else:result[0] = np.nan
for i in range(1, data.shape[0]):
w += alpharev_exp
e = e*alpharev + data[i]
if i > n -3:result[i] = e / w
else:result[i] = np.nan
if alpharev_exp > 3e-307:alpharev_exp*= alpharev
else:alpharev_exp=0.
return result
<小时/>
多处理部分
def func(tup, data): #<-------------the function to be run among all
a_temp= a[tup[2][0]]
idx1 = a_temp > a[tup[2][1]]
idx2= a_temp < b[(tup[2][1], tup[1][1])]
c_final = c[tup[0][1]][idx1 | idx2]
data_final= data[idx1 | idx2]
return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1]
def setup(a_dict, b_dict, c_dict): #initialize the shared dictionaries
global a,b,c
a,b,c = a_dict, b_dict, c_dict
def main(a_arr, b_arr, c_arr, common_len):
np.random.seed(0)
data_array= np.random.normal(loc= 24004, scale=500, size= common_len)
a_size = a_arr[-1] + 1
b_size = len(b_arr)
c_size = len(c_arr)
loop_combo = product(enumerate(c_arr),
enumerate(b_arr),
(n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0])
)
result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32)
###################################################
#This part simulates the heavy-computation in the actual problem
a= {}
b= {}
c= {}
for i in range(1, a_arr[-1]+1):
a[i]= transform_array_a(data_array, i)
if i in a_arr:
for j in b_arr:
b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j
for i in c_arr:
c[i]= transform_array_c(data_array, i)
###################################################
with mp.Pool(processes= mp.cpu_count() - 1,
initializer= setup,
initargs= [a,b,c]
) as pool:
mp_res= pool.imap_unordered(partial(func, data= data_array),
loop_combo
)
for item in mp_res:
result[item[0]] =item[1]
return result
if __name__ == '__main__':
mp.freeze_support()
a_arr= np.arange(2,44,2)
b_arr= np.arange(0.4,0.8, 0.20)
c_arr= np.arange(2,42,10)
common_len= 440000
final_res= main(a_arr, b_arr, c_arr, common_len)
出于性能考虑,所有进程之间使用多个共享的“只读”字典来减少冗余计算(在实际问题中,所有进程之间使用共享字典后总计算时间减少了40%)。然而,在我的实际问题中使用共享字典后,内存使用率变得异常高;我的 6C/12T Windows 计算机的内存使用量从(8.2GB 峰值,5.0GB 空闲)变为(23.9GB 峰值,5.0GB 空闲),为了获得 40% 的速度提升,付出的成本有点太高了。
当必须在进程之间使用多个共享数据时,高内存使用率是否不可避免?可以对我的代码进行哪些操作,以便在使用尽可能少的内存的同时使其尽可能快?
提前谢谢
<小时/>注意:我尝试使用 imap_unordered()
而不是 map
因为我听说它应该在输入可迭代较大时减少内存使用量,但老实说我可以内存使用率没有改善。也许我在这里做错了什么?
编辑:由于答案中的反馈,我已经更改了代码的繁重计算部分,使其看起来不那么虚拟并且类似于实际问题中的计算。
最佳答案
High Memory Usage when manipulating shared dictionaries in python
multiprocessing
run in Windows
在我们深入细节之前,有必要先揭开这个问题的神秘面纱 - 原始代码中没有共享字典,他们获得的越少被操纵(是的,每个 a,b,c
确实被“分配”到对 dict_a, dict_b, dict_c
的引用,但它们都没有共享 ,但只是像 Windows 级 O/S-es 中的 multiprocessing
那样进行复制。不会写入“到”dict
-s(只是从其任一副本进行非破坏性读取)
类似地,np.memmap()
-s 可以将最初提出的数据的某些部分放入磁盘空间(这样做的代价是+承担一些(延迟屏蔽的)随机读取延迟 ~ 10 [ms]
而不是 ~ 0.5 [ns]
如果聪明的话-对齐的矢量化内存模式被设计成性能热点),但这里不应预期范式发生巨大变化,因为“外部迭代器”几乎避免了任何智能对齐缓存的重用
Q : What can be done to my code in order to make it as fast as possible while using as low memory as possible?
第一个罪过是使用 8B
- int64
来存储一个普通的 Bbit(这里还没有 Qbits ~ 向 Burnaby Quantum 研发团队致敬)
for i in c_arr: # ~~ np.arange( 2, 42, 10 )
np.random.seed( i ) # ~ yields a deterministic state
c[i] = np.random.poisson( size = common_len ) # ~ 440.000 int64-s with {0|1}
这需要 6
(进程)x 440000
x 8B ~ 0.021 GB
“走私”到字典 c
的所有副本中,而每个这样的值都是确定性已知的,并且只需知道 i
的值即可在相应的目标进程内生成 ALAP(实际上不需要预先生成并多次复制 ~ 0.021 GB
数据)
到目前为止,Windows 级操作系统 lack an os.fork()
and thus do a python 的完整副本(是的,RAM ...,是的,时间)按照要求复制了许多 python 解释器 session (加上导入主模块),位于 multiprocessing
用于基于进程的分离(这样做是为了避免 GIL 锁有序,纯 [SERIAL]
,代码执行)
最佳下一步:
重新考虑代码
以提高效率和性能
最好的下一步 - 重构代码,以尽量减少对 6 个进程的“浅薄”(且昂贵)使用,但由中央迭代器“外部”命令(loop_combo
“独裁者” 使用约 18522 个项目重复调用“远程调度” func( tup, data )
,以便获取一个简单的“DMA 元组”- ( (x,y,z), value )
将一个 value
存储到中央进程 result
- float32
- 数组)。
尝试增加计算“密度” - 因此尝试通过分而治之的方式重构代码(即,每个 mp.pool
进程在一个平 slider 中计算一些非常大的专用子-所覆盖的参数空间的空间(这里迭代地“从外部”)并且可以轻松地减少返回的结果 block 。这样做只会提高性能(最好没有任何形式的昂贵共享)。
此重构将避免参数 pickle
/unpickle
-成本(附加开销 - 一次性(传递唯一参数集值)和重复性(在大约 18522 次执行的重复内存中) - 分配、累积和 pickle
/unpickle
- 由于调用签名设计/工程不佳而导致 np.arange( 440000 )
的成本)
所有这些步骤将提高您的处理效率并减少不必要的 RAM 分配。
关于python - 在 Windows 中运行 python 多处理时内存使用率较高,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58701900/