我想优化一些由两个嵌套循环组成的 python 代码。我对 numpy 不太熟悉,但我知道它应该能让我提高这样一个任务的效率。下面是我编写的测试代码,它反射(reflect)了实际代码中发生的情况。目前使用 numpy 范围和迭代器比通常的 python 慢。我究竟做错了什么?这个问题的最佳解决方案是什么?
感谢您的帮助!
import numpy
import time
# setup a problem analagous to that in the real code
npoints_per_plane = 1000
nplanes = 64
naxis = 1000
npoints3d = naxis + npoints_per_plane * nplanes
npoints = naxis + npoints_per_plane
specres = 1000
# this is where the data is being mapped to
sol = dict()
sol["ems"] = numpy.zeros(npoints3d)
sol["abs"] = numpy.zeros(npoints3d)
# this would normally be non-random input data
data = dict()
data["ems"] = numpy.zeros((npoints,specres))
data["abs"] = numpy.zeros((npoints,specres))
for ip in range(npoints):
data["ems"][ip,:] = numpy.random.random(specres)[:]
data["abs"][ip,:] = numpy.random.random(specres)[:]
ems_mod = numpy.random.random(1)[0]
abs_mod = numpy.random.random(1)[0]
ispec = numpy.random.randint(specres)
# this the code I want to optimize
t0 = time.time()
# usual python range and iterator
for ip in range(npoints_per_plane):
jp = naxis + ip
for ipl in range(nplanes):
ip3d = jp + npoints_per_plane * ipl
sol["ems"][ip3d] = data["ems"][jp,ispec] * ems_mod
sol["abs"][ip3d] = data["abs"][jp,ispec] * abs_mod
t1 = time.time()
# numpy ranges and iterator
ip_vals = numpy.arange(npoints_per_plane)
ipl_vals = numpy.arange(nplanes)
for ip in numpy.nditer(ip_vals):
jp = naxis + ip
for ipl in numpy.nditer(ipl_vals):
ip3d = jp + npoints_per_plane * ipl
sol["ems"][ip3d] = data["ems"][jp,ispec] * ems_mod
sol["abs"][ip3d] = data["abs"][jp,ispec] * abs_mod
t2 = time.time()
print "plain python: %0.3f seconds" % ( t1 - t0 )
print "numpy: %0.3f seconds" % ( t2 - t1 )
编辑:将“jp = naxis + ip”放在第一个 for 循环中
补充说明:
我想出如何让 numpy 快速执行内循环,而不是外循环:
# numpy vectorization
for ip in xrange(npoints_per_plane):
jp = naxis + ip
sol["ems"][jp:jp+npoints_per_plane*nplanes:npoints_per_plane] = data["ems"][jp,ispec] * ems_mod
sol["abs"][jp:jp+npoints_per_plane*nplanes:npoints_per_plane] = data["abs"][jp,ispec] * abs_mod
Joe 的以下解决方案展示了如何同时执行这两项操作,谢谢!
最佳答案
在 numpy 中编写循环的最佳方式是不编写循环,而是使用向量化操作。例如:
c = 0
for i in range(len(a)):
c += a[i] + b[i]
成为
c = np.sum(a + b, axis=0)
对于形状为 (100000, 100)
的 a
和 b
,第一个变量需要 0.344 秒,第二个变量需要 0.062 秒第二个。
在您的问题中出现的情况下,您可以执行以下操作:
sol['ems'][naxis:] = numpy.ravel(
numpy.repeat(
data['ems'][naxis:,ispec,numpy.newaxis] * ems_mod,
nplanes,
axis=1
),
order='F'
)
这可以通过 some tricks 进一步优化,但这会降低清晰度并且可能是过早的优化,因为:
plain python: 0.064 seconds
numpy: 0.002 seconds
解决方案的工作原理如下:
您的原始版本包含 jp = naxis + ip
仅跳过第一个 naxis
元素 [naxis:]
选择除第一个 naxis 之外的所有元素元素。您的内部循环将 data[jp,ispec]
的值重复 nplanes
次并将其写入多个位置 ip3d = jp + npoints_per_plane * ipl
这相当于一个扁平化的二维数组偏移了 naxis
。因此,第二个维度通过 numpy.newaxis
添加到(以前的 1D)data['ems'][naxis:, ispec]
,值重复 通过
次。然后通过 numpy.repeat
沿着这个新维度 nplanesnumpy.ravel
再次展平生成的二维数组(按 Fortran 顺序,即最低轴具有最小步幅)并写入 sol['ems' 的适当子数组]
。如果目标数组实际上是二维的,则可以使用自动数组广播跳过重复。
如果遇到无法避免使用循环的情况,可以使用 Cython (在 numpy 数组上支持 efficient buffer views)。
关于Python:优化循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17726811/