python - 如何优化python中矩阵的数学运算

标签 python numpy matrix heuristics

我正在尝试减少使用两个矩阵执行一系列计算的函数的时间。搜索这个,我听说过 numpy,但我真的不知道如何将它应用于我的问题。另外,我认为使我的函数变慢的原因之一是有很多点运算符(我在这个 this page 中听说过)。

数学对应于二次分配问题的因式分解:

QAP Factorization

我的代码是:

    delta = 0
    for k in xrange(self._tam):
        if k != r and k != s:
            delta +=
                self._data.stream_matrix[r][k] \
                * (self._data.distance_matrix[sol[s]][sol[k]] - self._data.distance_matrix[sol[r]][sol[k]]) + \
                self._data.stream_matrix[s][k] \
                * (self._data.distance_matrix[sol[r]][sol[k]] - self._data.distance_matrix[sol[s]][sol[k]]) + \
                self._data.stream_matrix[k][r] \
                * (self._data.distance_matrix[sol[k]][sol[s]] - self._data.distance_matrix[sol[k]][sol[r]]) + \
                self._data.stream_matrix[k][s] \
                * (self._data.distance_matrix[sol[k]][sol[r]] - self._data.distance_matrix[sol[k]][sol[s]])
    return delta

在大小为 20(20x20 的矩阵)的问题上运行这个需要大约 20 个段,瓶颈在这个函数中

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
303878   15.712    0.000   15.712    0.000 Heuristic.py:66(deltaC)

我尝试将 map 应用于 for 循环,但因为循环体不是函数调用,所以不可能。

我怎样才能减少时间?

编辑1

回答艾肯伯格的评论:

sol 是一个排列,例如 [1,2,3,4]。当我生成邻居解决方案时调用该函数,因此 [1,2,3,4] 的邻居是 [2,1,3,4]。我只更改原始排列中的两个位置,然后调用 deltaC,它计算位置 r,s 交换的解的因式分解(在上面的示例中 r,s = 0,1)。进行这种排列是为了避免计算邻居解决方案的全部成本。我想我可以将 sol[k,r,s] 的值存储在局部变量中,以避免在每次迭代中查找它的值。 我不知道这是否是您在评论中提出的问题。

编辑2

一个最小的工作示例:

import random


distance_matrix = [[0, 12, 6, 4], [12, 0, 6, 8], [6, 6, 0, 7], [4, 8, 7, 0]]
stream_matrix = [[0, 3, 8, 3], [3, 0, 2, 4], [8, 2, 0, 5], [3, 4, 5, 0]]

def deltaC(r, s, S=None):
    '''
    Difference between C with values i and j swapped
    '''

    S = [0,1,2,3]

    if S is not None:
        sol = S
    else:
        sol = S

    delta = 0

    sol_r, sol_s = sol[r], sol[s]

    for k in xrange(4):
        if k != r and k != s:
            delta += (stream_matrix[r][k] \
                * (distance_matrix[sol_s][sol[k]] - distance_matrix[sol_r][sol[k]]) + \
                stream_matrix[s][k] \
                * (distance_matrix[sol_r][sol[k]] - distance_matrix[sol_s][sol[k]]) + \
                stream_matrix[k][r] \
                * (distance_matrix[sol[k]][sol_s] - distance_matrix[sol[k]][sol_r]) + \
                stream_matrix[k][s] \
                * (distance_matrix[sol[k]][sol_r] - distance_matrix[sol[k]][sol_s]))
    return delta


for _ in xrange(303878):
    d = deltaC(random.randint(0,3), random.randint(0,3))
print d

现在我认为更好的选择是使用 NumPy。我尝试使用 Matrix(),但没有提高性能。

找到的最佳解决方案

好吧,最后我能够结合@TooTone 的解决方案并将索引存储在一个集合中以避免 if 来减少时间。时间从大约 18 秒减少到 8 秒。这是代码:

def deltaC(self, r, s, sol=None):
    delta = 0
    sol = self.S if sol is None else self.S
    sol_r, sol_s = sol[r], sol[s]

    stream_matrix = self._data.stream_matrix
    distance_matrix = self._data.distance_matrix

    indexes = set(xrange(self._tam)) - set([r, s])

    for k in indexes:
        sol_k = sol[k]
        delta += \
            (stream_matrix[r][k] - stream_matrix[s][k]) \
            * (distance_matrix[sol_s][sol_k] - distance_matrix[sol_r][sol_k]) \
            + \
            (stream_matrix[k][r] - stream_matrix[k][s]) \
            * (distance_matrix[sol_k][sol_s] - distance_matrix[sol_k][sol_r])
    return delta

为了进一步减少时间,我认为最好的方法是编写一个模块。

最佳答案

在您给出的简单示例中,使用 for k in xrange(4): 循环体仅执行两次(如果 r==s),或者三倍(如果 r!=s)和下面的初始 numpy 实现速度较慢。 Numpy 针对长向量执行计算进行了优化,如果向量很短,开销可能会超过 yield 。 (请注意,在此公式中,矩阵在不同的维度上被切片,并且索引不连续,这只会使矢量化实现变得更加复杂)。

import numpy as np

distance_matrix_np = np.array(distance_matrix)
stream_matrix_np = np.array(stream_matrix)
n = 4

def deltaC_np(r, s, sol):
    delta = 0
    sol_r, sol_s = sol[r], sol[s]

    K = np.array([i for i in xrange(n) if i!=r and i!=s])

    return np.sum(
        (stream_matrix_np[r,K] - stream_matrix_np[s,K]) \
        *  (distance_matrix_np[sol_s,sol[K]] - distance_matrix_np[sol_r,sol[K]]) + \
        (stream_matrix_np[K,r] - stream_matrix_np[K,s]) \
        * (distance_matrix_np[sol[K],sol_s] - distance_matrix_np[sol[K],sol_r]))

在这个 numpy 实现中,不是对 K 中的元素进行 for 循环,而是将操作应用于 K 中的所有元素在 numpy 中。另请注意,您的数学表达式可以简化。左边括号中的每一项都是右边括号中的项的负数。 enter image description here

这也适用于您的原始代码。例如,(self._data.distance_matrix[sol[s]][sol[k]] - self._data.distance_matrix[sol[r]][sol[k]]) 等于-1 次 (self._data.distance_matrix[sol[r]][sol[k]] - self._data.distance_matrix[sol[s]][sol[k]]),所以你正在做不必要的计算,并且可以在不使用 numpy 的情况下优化您的原始代码。

事实证明,numpy 函数的瓶颈在于看似无辜的列表推导

K = np.array([i for i in xrange(n) if i!=r and i!=s])

一旦它被矢量化代码取代

if r==s:
    K=np.arange(n-1)
    K[r:] += 1
else:
    K=np.arange(n-2)
    if r<s:
        K[r:] += 1
        K[s-1:] += 1
    else:
        K[s:] += 1
        K[r-1:] += 1

numpy 函数快得多

运行时间图如下所示(此答案的右下方是优化 numpy 函数之前的原始图)。您可以看到使用优化的原始代码或 numpy 代码是否有意义,具体取决于矩阵的大小。

enter image description here

下面是完整的代码以供引用,部分是为了防止其他人可以更进一步。 (函数 deltaC2 是您的原始代码,经过优化以考虑简化数学表达式的方式。)

def deltaC(r, s, sol):
    delta = 0
    sol_r, sol_s = sol[r], sol[s]
    for k in xrange(n):
        if k != r and k != s:
            delta += \
                stream_matrix[r][k] \
                * (distance_matrix[sol_s][sol[k]] - distance_matrix[sol_r][sol[k]]) + \
                stream_matrix[s][k] \
                * (distance_matrix[sol_r][sol[k]] - distance_matrix[sol_s][sol[k]]) + \
                stream_matrix[k][r] \
                * (distance_matrix[sol[k]][sol_s] - distance_matrix[sol[k]][sol_r]) + \
                stream_matrix[k][s] \
                * (distance_matrix[sol[k]][sol_r] - distance_matrix[sol[k]][sol_s])
    return delta

import numpy as np

def deltaC_np(r, s, sol):
    delta = 0
    sol_r, sol_s = sol[r], sol[s]

    if r==s:
        K=np.arange(n-1)
        K[r:] += 1
    else:
        K=np.arange(n-2)
        if r<s:
            K[r:] += 1
            K[s-1:] += 1
        else:
            K[s:] += 1
            K[r-1:] += 1
    #K = np.array([i for i in xrange(n) if i!=r and i!=s]) #TOO SLOW

    return np.sum(
        (stream_matrix_np[r,K] - stream_matrix_np[s,K]) \
        *  (distance_matrix_np[sol_s,sol[K]] - distance_matrix_np[sol_r,sol[K]]) + \
        (stream_matrix_np[K,r] - stream_matrix_np[K,s]) \
        * (distance_matrix_np[sol[K],sol_s] - distance_matrix_np[sol[K],sol_r]))

def deltaC2(r, s, sol):
    delta = 0
    sol_r, sol_s = sol[r], sol[s]
    for k in xrange(n):
        if k != r and k != s:
            sol_k = sol[k]
            delta += \
                (stream_matrix[r][k] - stream_matrix[s][k]) \
                * (distance_matrix[sol_s][sol_k] - distance_matrix[sol_r][sol_k]) \
                + \
                (stream_matrix[k][r] - stream_matrix[k][s]) \
                * (distance_matrix[sol_k][sol_s] - distance_matrix[sol_k][sol_r])
    return delta


import time

N=200

elapsed1s = []
elapsed2s = []
elapsed3s = []
ns = range(10,410,10)
for n in ns:
    distance_matrix_np=np.random.uniform(0,n**2,size=(n,n))
    stream_matrix_np=np.random.uniform(0,n**2,size=(n,n))
    distance_matrix=distance_matrix_np.tolist()
    stream_matrix=stream_matrix_np.tolist()
    sol  = range(n-1,-1,-1)
    sol_np  = np.array(range(n-1,-1,-1))

    Is = np.random.randint(0,n-1,4)
    Js = np.random.randint(0,n-1,4)

    total1 = 0
    start = time.clock()
    for reps in xrange(N):
        for i in Is:
            for j in Js:
                total1 += deltaC(i,j, sol)
    elapsed1 = (time.clock() - start)
    start = time.clock()

    total2 = 0
    start = time.clock()
    for reps in xrange(N):
        for i in Is:
            for j in Js:
                total2 += deltaC_np(i,j, sol_np)
    elapsed2 = (time.clock() - start)

    total3 = 0
    start = time.clock()
    for reps in xrange(N):
        for i in Is:
            for j in Js:
                total3 += deltaC2(i,j, sol_np)
    elapsed3 = (time.clock() - start)

    print n, elapsed1, elapsed2, elapsed3, total1, total2, total3
    elapsed1s.append(elapsed1)
    elapsed2s.append(elapsed2)
    elapsed3s.append(elapsed3)

    #Check errors of one method against another
    #err = 0
    #for i in range(min(n,50)):
    #    for j in range(min(n,50)):
    #        err += np.abs(deltaC(i,j,sol)-deltaC_np(i,j,sol_np))
    #print err
import matplotlib.pyplot as plt

plt.plot(ns, elapsed1s, label='Original',lw=2)
plt.plot(ns, elapsed3s, label='Optimized',lw=2)
plt.plot(ns, elapsed2s, label='numpy',lw=2)
plt.legend(loc='upper left', prop={'size':16})
plt.xlabel('matrix size')
plt.ylabel('time')
plt.show()

这是在 deltaC_np

中优化列表理解之前的原始图表

enter image description here

关于python - 如何优化python中矩阵的数学运算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23033416/

相关文章:

python-3.x - 如何在两个矩阵的比较中允许任何列的符号差异,Python?

C++ 按 2 列对多维数组进行排序

c - 从C中的txt文件打印矩阵

python - 使用opencv python检测白色 Blob

python - 如何在 Python 2 中下载大文件

android - Buildozer Numpy 运行时错误 : Broken toolchain: cannot link a simple C program

python - 为什么我导入 numpy 后多处理只使用一个核心?

r - R中矩阵的反向索引

Python循环遍历文件夹并重命名文件

python - 使用python实时解析包含回车的命令行输出进程