python - 如何使用多线程加速嵌套for循环计算?

标签 python multithreading optimization multiprocessing numba

我正在尝试对大型数组执行数值积分,并且计算需要很长时间。我尝试使用 numba 和 jit 装饰器来加速我的代码,但不支持 numpy.trapz。

我的新想法是创建 n 个线程来并行运行计算,但我想知道如何做到这一点,或者它是否可行?

引用以下代码

我可以让 sz[2] 多个线程同时运行,调用 ZO_SteadState 来计算值吗?

    for i in range(sz[1]):
        phii = phi[i]
        for j in range(sz[2]):
            s = tau[0, i, j, :].reshape(1, n4)
            [R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
            BCoeff = Bessel0(bm * R3)

            SS[0, i, j] = ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v)

有问题的计算。
@jit()
def ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v):
    g = 1000000 * exp(-(10 ** 5) * (R3 - (b / maxt) * S3) ** 2) * (
            exp(-(10 ** 5) * (PHI3 - 0) ** 2) + exp(-(10 ** 5) * (PHI3 - 2 * np.pi) ** 2) + exp(
        -(10 ** 5) * (PHI3 - 2 * np.pi / 3) ** 2) + exp(
        -(10 ** 5) * (PHI3 - 4 * np.pi / 3) ** 2))  # stationary point heat source.

    y = R3 * ((np.sqrt(2) / b) * (1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2))))))
              * (BCoeff / Bessel_Denom)) * np.cos(v * (phii - PHI3)) * g

    x = (np.trapz(y, phiprime, axis=0)).reshape(1, 31, 300)

    # integral transform of heat source. integral over y-axis
    gbarbar = np.trapz(x, rprime, axis=1)

    PHI2 = np.meshgrid(phiprime, s)[0]

    sz2 = PHI2.shape
    f = h2 * 37 * Array_Ones((sz2[0], sz[1]))  # boundary condition.

    fbar = np.trapz(np.cos(v * (phii - PHI2)) * f, phiprime, 1).reshape(1, n4)  # integrate over y

    A = (alpha / k) * gbarbar + ((np.sqrt(2) * alpha) / k2) * (
                1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2)))))) * fbar

    return np.trapz(exp(-alpha * bm ** 2 * (T[0, i, j] - s)) * A, s)

最佳答案

另一个概念实现,进程产生进程(编辑:jit 测试):

import numpy as np

# better pickling
import pathos 
from contextlib import closing


from numba import jit

#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(pathos.multiprocessing.Pool):
    def Process(self, *args, **kwds):
        return NoDaemonProcess(*args, **kwds)




# matrix dimensions
x = 100 # i
y = 500 # j

NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned

SS = np.zeros([x, y], dtype=float)

@jit
def foo(i):
    return (i*i + 1)
@jit
def bar(phii, j):
    return phii*(j+1)

# The code which is implemented down here:
'''
for i in range(x):
    phii = foo(i)
    for j in range(y):
        SS[i, j] = bar(phii, j)
'''

# Threaded version:
# queue is in global scope


def outer_loop(i):

    phii = foo(i)

    # i is in process scope
    def inner_loop(j):
        result = bar(phii,j)
        # the data is coordinates and result
        return (i, j, result)


    with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
        res = list(pool.imap(inner_loop, range(y)))
    return res

with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:    
    results = list(pool.imap(outer_loop, range(x)))

result_list = []
for r in results:
    result_list += r


# read results from queue
for res in result_list:
    if res:
        i, j, val = res
        SS[i,j] = val


# check that all cells filled
print(np.count_nonzero(SS)) # 100*500

编辑:解释。

这段代码中所有复杂性的原因是我想做比 OP 要求的更多的并行化。如果仅并行化内循环,则保留外循环,因此对于外循环的每次迭代,都会创建新的进程池并执行内循环的计算。只要在我看来,该公式不依赖于外循环的其他迭代,我决定并行化所有内容:现在外循环的计算被分配给池中的进程,之后每个“外循环”进程创建自己的新池,并产生额外的进程来执行内部循环的计算。

我可能错了,外循环不能并行化;在这种情况下,您只能保留内部进程池。

使用进程池可能不是最佳解决方案,因为创建和销毁池会消耗时间。更有效(但需要手动模式)的解决方案是一劳永逸地实例化 N 个进程,然后将数据输入它们并使用多处理 Queue() 接收结果。所以你应该首先测试这个多处理解决方案是否给你足够的加速(如果与Z0_SteadyState运行相比,构造和销毁池的时间很短,就会发生这种情况)。

下一个复杂因素是人工无守护程序池。守护进程用于优雅地停止应用程序:当主程序退出时,守护进程会静默终止。但是,守护进程不能产生子进程。在您的示例中,您需要等到每个进程结束才能检索数据,因此我将它们设为非守护进程以允许生成子进程来计算内部循环。

数据交换:我认为与实际计算相比,填充矩阵所需的数据量和时间是很小的。所以我使用池和pool.imap功能(比 .map() 快一点。您也可以尝试 .imap_unordered() ,但在您的情况下它不应该有显着差异)。因此,内部池一直等到所有结果都被计算出来并将它们作为列表返回。因此,外部池返回必须连接的列表列表。然后在单个快速循环中根据这些结果重建矩阵。

通知 with closing() thing:在这条语句下的事情完成后,它会自动关闭池,避免僵尸进程消耗内存。

此外,您可能会注意到我奇怪地在另一个函数中定义了一个函数,并且在进程内部我可以访问一些尚未传递到那里的变量:i , phii .发生这种情况是因为进程可以访问使用 copy-on-change 启动它们的全局范围。策略(默认 fork 模式)。这不涉及酸洗并且速度很快。

最后一条评论是关于使用 pathos库而不是标准 multiprocessing , concurrent.futures , subprocess等。原因是pathos使用了更好的酸洗库,因此它可以序列化标准库不能序列化的函数(例如,lambda 函数)。我不知道你的功能,所以我使用了更强大的工具来避免进一步的问题。

最后一件事:多处理与线程。您可以更改pathos处理池到标准 ThreadPoolExecutor来自 concurrent.futures ,就像我刚开始编写该代码时所做的那样。但是,在执行期间,在我的系统上 CPU 仅加载 100%(即使用了一个内核,看起来所有 8 个内核都加载了 15-20%)。我不太擅长理解线程和进程之间的差异,但在我看来,进程允许利用所有内核(每个内核 100%,总共 800%)。

关于python - 如何使用多线程加速嵌套for循环计算?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52919039/

相关文章:

javascript - 如何向 Flask Server 发送数据?

c# - 每个线程更多 TCPClient 或更多线程

c - 测试并发数据结构

python - Pdf文件阅读器 : PdfReadError: Could not find xref table at specified location

python - 将 argparse 别名解析回原始命令

multithreading - 已经处理许多并发结果中的两个,优化 - 架构?算法?

performance - matlab:有效计算圆形邻域内的局部直方图

python - 正则表达式的最坏情况分析

mysql - 将 ORDER BY 'x' 与 JOIN 一起使用,但保留没有 'x' 值的行

python - 数据导入 uploadData API - 通过 GCS 和 App Engine 应用程序实现自动化