python - 如何使用 Numba 优化随机 Runge-Kutta?

标签 python numba differential-equations runge-kutta

我正在尝试优化随机微分方程。这对我来说意味着我必须求解一组 Mmax 独立微分方程。对于该任务,我使用 Numba,但我感觉我做错了,因为即使我没有遇到任何错误并且时间减少了一些,但也没有我预期的那么多。

我的代码看起来像


@njit(fastmath=True)
def Liouvillian(rho,t_list,alpha1,alpha1p):
    C = np.zeros((len(rho0)),dtype=np.complex64)
    B = L0+alpha1p*L1m+alpha1*L1p
    for j in range(len(rho0)):
        for i in range(len(rho0)):
            C[j] += B[j, i]*rho[i] 
    return C

@njit(parallel=True, fastmath=True)
def RK4(rho0,t_list,alpha1,alpha1p):
    rho=np.zeros((len(rho0),len(t_list),Mmax),dtype=np.complex64) 
    for m in prange(Mmax):
        rho[:,0,m]=rho0
        n=0
        for t in t_list[0:len(t_list)-1]:
            k1=Liouvillian(rho[:,n,m],t,alpha1[st*n,m],alpha1p[st*n,m])
            k2=Liouvillian(rho[:,n,m]+dt*k1/2,t+dt/2,alpha1[st*n,m],alpha1p[st*n,m])
            k3=Liouvillian(rho[:,n,m]+dt*k2/2,t+dt/2,alpha1[st*n,m],alpha1p[st*n,m])
            k4=Liouvillian(rho[:,n,m]+dt*k3,t+dt,alpha1[st*n,m],alpha1p[st*n,m])
            rho[:,n+1,m]=rho[:,n,m]+dt*(k1+2*k2+2*k3+k4)/6.0
            n=n+1
    return rho

其中 alpha1p 和 alpha 是具有元素 [t,m] 的二维数组,L0,Lp,Lm 只是数组。

具体取Mmax=10000,不使用Numba需要16秒,而使用Numba(并行和快速数学),则减少到14秒。虽然我看到了一些减少,但这并不是我对并行化的预期。我的代码有问题还是这是我应该得到的?

编辑:这里我附上用于生成随机数组的代码,稍后我将在 ODE 求解器上使用。

def A_matrix(num_modes,kappa,E_0):
    A_modes=np.zeros(((2*num_modes),(2*num_modes)),dtype=np.complex64)
    for i in range(np.shape(A_modes)[0]):
        for j in range(np.shape(A_modes)[0]):
            if i==j:
                A_modes[i,j]=-kappa/4
                if i==0:
                    A_modes[i,i+num_modes+1]=E_0*kappa/2  
                    A_modes[i,-1]=E_0*kappa/2 
                elif 0<i<num_modes-1:
                    A_modes[i,i+num_modes+1]=E_0*kappa/2
                    A_modes[i,i+num_modes-1]=E_0*kappa/2 
                elif i==num_modes:
                    A_modes[i-1,i]=E_0*kappa/2 
                    A_modes[i-1,-2]=E_0*kappa/2 
    return (A_modes+A_modes.transpose())


def D_mat(num_modes):
    D_matrix=np.zeros(((num_modes),(num_modes)),dtype=np.complex64)
    for i in range(np.shape(D_matrix)[0]):
        for j in range(np.shape(D_matrix)[0]):
            if i==j:
                if i==0:
                    D_matrix[i,i+1]=1
                    D_matrix[i,-1]=1
                elif 0<i<num_modes-1:
                    D_matrix[i,i+1]=1
    return (D_matrix+D_matrix.transpose())

def a_part(alpha,t_list):
    M=A_matrix(num_modes,kappa,E_0)
    return M.dot(alpha)


def b_part(w,t_list):
    D_1=D_mat(num_modes)
    Zero_modes=np.zeros(((num_modes),(num_modes)),dtype=np.complex64)
    D=np.bmat([[D_1, Zero_modes], [Zero_modes, D_1]])
    B=sqrtm(D)*np.sqrt(E_0*kappa/2)
    return B.dot(w)

def SDE_Param_Euler_Mauyrama(Mmax):
    alpha=np.zeros((2*num_modes,Smax+1,Mmax),dtype=np.complex64)
    n=0
    alpha[:,n,:]=0.0+1j*0.0
    for s in s_list[0:len(s_list)-1]:
        alpha[:,n+1,:]=alpha[:,n,:]+ds*a_part(alpha[:,n,:],s)+b_part(w[:,n,:],s)
        n=n+1
    return (alpha)

#Parameters
E_0=0.5
kappa=10.
gamma=1.
num_modes=2 ## number of modes

Mmax=10000 #number of samples

Tmax=20 ##max value for time
dt=1/(2*kappa)

st=10

ds=dt/st
Nmax=int(Tmax/dt) ##number of steps
Smax=int(Tmax/ds) ##number of steps

t_list=np.arange(0,Tmax+dt/2,dt)
s_list=np.arange(0,Tmax+ds/2,ds)


w = np.random.randn(2*num_modes,Smax+1,Mmax)*np.sqrt(ds)

(alpha1,alpha2,alpha1p,alpha2p)=SDE_Param_Euler_Mauyrama(Mmax)

from qutip import *
Delta_a=0

num_qubits=1
##Atom 1
sm_1=sigmam()
sp_1=sigmap()
sx_1=sigmax()
sy_1=sigmay()
sz_1=sigmaz()


state0=np.zeros(2**num_qubits,dtype=np.complex64)
state0[-1]=1

rho0=np.kron(state0,np.conjugate(np.transpose(state0)))

Ha=Delta_a*(sz_1)/2    #Deterministic Liouvillian
L_ops=[np.sqrt(gamma)*sm_1]

L0=np.array(liouvillian(Ha,L_ops),dtype=np.complex64)

L1m=-np.array(np.sqrt(kappa*gamma)*1j*liouvillian(sm_1),dtype=np.complex64)
L1p=np.array(np.sqrt(kappa*gamma)*1j*liouvillian(sp_1),dtype=np.complex64)

rho_1=RK4(rho0,t_list,alpha1,alpha1p).reshape(2**num_qubits,2**num_qubits,len(t_list),Mmax)


最佳答案

我预计主要瓶颈是临时数组,特别是因为代码是并行的,并且与临时数组的数量及其大小相比,计算量很小(较大的数组是 slower to allocate )。因此,关键是通过分配一次并重用它们来避免临时数组。您还可以使用循环来避免像 Numpy 那样隐式创建临时数组。分配临时数组是一项昂贵的操作,并且填充新分配的数组很慢,因为 page faults 。例如,k1+2*k2+2*k3+k4 创建了许多不需要的临时数组。您可以根据k1k2k3k4<直接用rho编写循环。您甚至可以通过在 rho 中累积值来避免首先创建这四个数组。内存操作往往比现代硬件上的计算慢,因此就地执行操作并避免复制和临时数组通常是提高性能的好主意,特别是在并行代码中,因为 L3 缓存和 DRAM 在内核之间共享。

此外,还有一个问题:内存访问模式效率低下。事实上,rho[:,n,m] 意味着您可以步幅n*m*16 访问项目。跨步内存访问的效率远低于连续内存访问,因为硬件针对后者进行了优化。避免重复跨步访问的一种解决方案是物理转置输入/输出数组(请注意,arr.T 不会物理转置数组,而是返回其上的非连续 View ,而 arr.T 不会物理转置数组,而是返回其上的非连续 View 。 T.copy() 确实返回连续的物理转置数组,尽管它还没有非常有效地实现)。就您而言,最好的方法当然是更改 rho 的布局。仅当在该算法之前/之后对其进行操作的其他算法也需要连续对其进行操作时,转置技巧才值得使用。您可以在每个算法之间转置数组一次,而不是在这里执行两次(也可能在其他算法中)。

这是第二点的示例:

@njit(parallel=True, fastmath=True)
def RK4(rho0,t_list,alpha1,alpha1p):
    rho=np.zeros((len(t_list), Mmax, len(rho0)),dtype=np.complex64)
    for m in prange(Mmax):
        rho[0,m,:]=rho0
        n=0
        for t in t_list[0:len(t_list)-1]:
            k1=Liouvillian(rho[n,m,:],t,alpha1[st*n,m],alpha1p[st*n,m])
            k2=Liouvillian(rho[n,m,:]+dt*k1/2,t+dt/2,alpha1[st*n,m],alpha1p[st*n,m])
            k3=Liouvillian(rho[n,m,:]+dt*k2/2,t+dt/2,alpha1[st*n,m],alpha1p[st*n,m])
            k4=Liouvillian(rho[n,m,:]+dt*k3,t+dt,alpha1[st*n,m],alpha1p[st*n,m])
            rho[n+1,m,:]=rho[n,m,:]+dt*(k1+2*k2+2*k3+k4)/6.0
            n=n+1
    return rho

结果被转置(因此与初始函数不同)。由于转置,我的机器上的计算速度大约快了一倍。应用第一点应该会带来更显着的加速。

关于python - 如何使用 Numba 优化随机 Runge-Kutta?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76113153/

相关文章:

python - 导入函数中的 __globals__ 与主模块中函数的 __globals__ 有很大不同

python - Dask 和 Numba - 如何有效地使用 map 分区?

python - 使用 Numba 支持组装 block 矩阵

julia - Julia 中的二阶 ODE 给出了错误的结果

performance - numpy:评估矩阵中的函数,使用前一个数组作为计算下一个数组的参数

python - 如何在微分方程(SciPy)中使用 if 语句?

python - 如何在 python joblib 中写入共享变量

python - 用skyfield寻找暮光之城

python - 反引号对 Python 解释器意味着什么?示例 : `num`

python - 当我不知道解析导数时用Python求解微分方程