python - 优化python中的双循环

标签 python performance loops numpy numba

我正在尝试优化以下循环:

def numpy(nx, nz, c, rho):
    for ix in range(2, nx-3):
        for iz in range(2, nz-3):
            a[ix, iz]  = sum(c*rho[ix-1:ix+3, iz])
            b[ix, iz]  = sum(c*rho[ix-2:ix+2, iz])
    return a, b

我尝试了不同的解决方案,发现使用 numba 来计算产品的总和可以带来更好的性能:

import numpy as np
import numba as nb
import time

@nb.autojit
def sum_opt(arr1, arr2):
    s = arr1[0]*arr2[0]
    for i in range(1, len(arr1)):
        s+=arr1[i]*arr2[i]
    return s

def numba1(nx, nz, c, rho):
    for ix in range(2, nx-3):
        for iz in range(2, nz-3):        
            a[ix, iz]  = sum_opt(c, rho[ix-1:ix+3, iz])
            b[ix, iz]  = sum_opt(c, rho[ix-2:ix+2, iz])
    return a, b

@nb.autojit
def numba2(nx, nz, c, rho):
    for ix in range(2, nx-3):
        for iz in range(2, nz-3):        
            a[ix, iz]  = sum_opt(c, rho[ix-1:ix+3, iz])
            b[ix, iz]  = sum_opt(c, rho[ix-2:ix+2, iz])
    return a, b
nx = 1024
nz = 256    

rho = np.random.rand(nx, nz)
c = np.random.rand(4)
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))

ti = time.clock()
a, b = numpy(nx, nz, c, rho)
print 'Time numpy  : ' + `round(time.clock() - ti, 4)`

ti = time.clock()
a, b = numba1(nx, nz, c, rho)
print 'Time numba1 : ' + `round(time.clock() - ti, 4)`

ti = time.clock()
a, b = numba2(nx, nz, c, rho)
print 'Time numba2 : ' + `round(time.clock() - ti, 4)`

这导致

Time numpy : 4.1595

Time numba1 : 0.6993

Time numba2 : 1.0135

使用 numba 版本的求和函数 (sum_opt) 表现非常好。但我想知道为什么双循环函数 (numba2) 的 numba 版本会导致执行时间变慢。我尝试使用 jit 而不是 autojit,指定参数类型,但情况更糟。

我还注意到,首先在最小循环上循环比在最大循环上首先循环慢。有什么解释吗?

无论是,我确信这个双循环函数可以通过向量化问题(如 this )或使用另一种方法(映射?)进行大量改进,但我对这些方法有点困惑。

在我代码的其他部分,我使用了 numba 和 numpy 切片方法来替换所有显式循环,但在这种特殊情况下,我不知道如何设置它。

有什么想法吗?

编辑

感谢您的所有评论。我在这个问题上做了一些工作:

import numba as nb
import numpy as np
from scipy import signal
import time


@nb.jit(['float64(float64[:], float64[:])'], nopython=True)
def sum_opt(arr1, arr2):
    s = arr1[0]*arr2[0]
    for i in xrange(1, len(arr1)):
        s+=arr1[i]*arr2[i]
    return s

@nb.autojit
def numba1(nx, nz, c, rho, a, b):
    for ix in range(2, nx-3):
        for iz in range(2, nz-3):        
            a[ix, iz]  = sum_opt(c, rho[ix-1:ix+3, iz])
            b[ix, iz]  = sum_opt(c, rho[ix-2:ix+2, iz])
    return a, b


@nb.jit(nopython=True)
def numba2(nx, nz, c, rho, a, b):
    for ix in range(2, nx-3):
        for iz in range(2, nz-3):        
            a[ix, iz]  = sum_opt(c, rho[ix-1:ix+3, iz])
            b[ix, iz]  = sum_opt(c, rho[ix-2:ix+2, iz])
    return a, b

@nb.jit(['float64[:,:](int16, int16, float64[:], float64[:,:], float64[:,:])'], nopython=True)
def numba3a(nx, nz, c, rho, a):
    for ix in range(2, nx-3):
        for iz in range(2, nz-3):        
            a[ix, iz]  = sum_opt(c, rho[ix-1:ix+3, iz])
    return a

@nb.jit(['float64[:,:](int16, int16, float64[:], float64[:,:], float64[:,:])'], nopython=True)
def numba3b(nx, nz, c, rho, b):
    for ix in range(2, nx-3):
        for iz in range(2, nz-3):        
            b[ix, iz]  = sum_opt(c, rho[ix-2:ix+2, iz])
    return b

def convol(nx, nz, c, aa, bb):
    s1 = rho[1:nx-1,2:nz-3]
    s2 = rho[0:nx-2,2:nz-3]
    kernel = c[:,None][::-1]
    aa[2:nx-3,2:nz-3] = signal.convolve2d(s1, kernel, boundary='symm', mode='valid')
    bb[2:nx-3,2:nz-3] = signal.convolve2d(s2, kernel, boundary='symm', mode='valid')
    return aa, bb


nx = 1024
nz = 256 
rho = np.random.rand(nx, nz)
c = np.random.rand(4)
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))

ti = time.clock()
for i in range(1000):
    a, b = numba1(nx, nz, c, rho, a, b)
print 'Time numba1 : ' + `round(time.clock() - ti, 4)`

ti = time.clock()
for i in range(1000):
    a, b = numba2(nx, nz, c, rho, a, b)
print 'Time numba2 : ' + `round(time.clock() - ti, 4)`

ti = time.clock()
for i in range(1000):
    a = numba3a(nx, nz, c, rho, a)
    b = numba3b(nx, nz, c, rho, b)
print 'Time numba3 : ' + `round(time.clock() - ti, 4)`

ti = time.clock()
for i in range(1000):
    a, b = convol(nx, nz, c, a, b)
print 'Time convol : ' + `round(time.clock() - ti, 4)`

您的解决方案非常优雅 Divakar,但我必须在我的代码中大量使用此功能。因此,对于 1000 次迭代,这将导致

Time numba1 : 3.2487

Time numba2 : 3.7012

Time numba3 : 3.2088

Time convol : 22.7696

autojitjit 非常接近。 但是,在使用 jit 时,指定所有参数类型似乎很重要。

我不知道当函数有多个输出时,是否有办法在 jit 装饰器中指定参数类型。有人吗?

目前我没有找到除使用 numba 之外的其他解决方案。欢迎新想法!

最佳答案

你基本上是在那里执行二维卷积,有一个小的修改,你的内核没有像往常一样反转 convolution操作确实。 所以,基本上,我们需要做两件事才能使用 signal.convolve2d。解决我们的问题 -

  • 对输入数组 rho 进行切片,以选择其中一部分用于您的代码的原始循环版本。这将是卷积的输入数据。
  • 反转内核 c 并将其与切片数据一起提供给 signal.convolve2d

请注意,这些是为了分别计算ab

这是实现-

import numpy as np
from scipy import signal

# Slices for convolutions to get a and b respectively        
s1 = rho[1:nx-1,2:nz-3]
s2 = rho[0:nx-2,2:nz-3]
kernel = c[:,None][::-1]  # convolution kernel

# Setup output arrays and fill them with convolution results
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))

a[2:nx-3,2:nz-3] = signal.convolve2d(s1, kernel, boundary='symm', mode='valid')
b[2:nx-3,2:nz-3] = signal.convolve2d(s2, kernel, boundary='symm', mode='valid')

如果您不需要输出数组边界周围的额外零,您可以直接使用 signal.convolve2d 的输出,这必须进一步提高性能。

运行时测试

In [532]: %timeit loop_based(nx, nz, c, rho)
1 loops, best of 3: 1.52 s per loop

In [533]: %timeit numba1(nx, nz, c, rho)
1 loops, best of 3: 282 ms per loop

In [534]: %timeit numba2(nx, nz, c, rho)
1 loops, best of 3: 509 ms per loop

In [535]: %timeit conv_based(nx, nz, c, rho)
10 loops, best of 3: 15.5 ms per loop

因此,对于实际输入数据大小,所提出的基于卷积的方法比循环代码快 100x,大​​约 20x 优于最快的基于 numba 的方法 numba1

关于python - 优化python中的双循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30211336/

相关文章:

python - 有效地计算邻居之间的距离

python - 如果一两个文件不存在,如何处理异常?

java - 性能:Java 与数据库

c - 遍历数组,检查是否 a+b+c == A 但避免重新检查 c+b+a 等

ruby - Ruby中 "each"循环的动态变化

python - 弹出一个命令,其中包含需要对所有输出说"is"的命令

python - 如何在调用 db.put() 之前在 GAE Python 中查找 db.Model 实例的大小?

c# - 为什么人们说 SqlDataReader.GetXXX(i) 比 SqlDataReader[i] 快?

c# - 高效滚动最大和最小窗口

c++ - 使用一个数组以相反的顺序制作另一个数组