我正在尝试对大型数组执行数值积分,并且计算需要很长时间。我尝试使用 numba 和 jit 装饰器来加速我的代码,但不支持 numpy.trapz。
我的新想法是创建 n 个线程来并行运行计算,但我想知道如何做到这一点,或者它是否可行?
引用以下代码
我可以让 sz[2] 多个线程同时运行,调用 ZO_SteadState 来计算值吗?
for i in range(sz[1]):
phii = phi[i]
for j in range(sz[2]):
s = tau[0, i, j, :].reshape(1, n4)
[R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
BCoeff = Bessel0(bm * R3)
SS[0, i, j] = ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v)
有问题的计算。
@jit()
def ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v):
g = 1000000 * exp(-(10 ** 5) * (R3 - (b / maxt) * S3) ** 2) * (
exp(-(10 ** 5) * (PHI3 - 0) ** 2) + exp(-(10 ** 5) * (PHI3 - 2 * np.pi) ** 2) + exp(
-(10 ** 5) * (PHI3 - 2 * np.pi / 3) ** 2) + exp(
-(10 ** 5) * (PHI3 - 4 * np.pi / 3) ** 2)) # stationary point heat source.
y = R3 * ((np.sqrt(2) / b) * (1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2))))))
* (BCoeff / Bessel_Denom)) * np.cos(v * (phii - PHI3)) * g
x = (np.trapz(y, phiprime, axis=0)).reshape(1, 31, 300)
# integral transform of heat source. integral over y-axis
gbarbar = np.trapz(x, rprime, axis=1)
PHI2 = np.meshgrid(phiprime, s)[0]
sz2 = PHI2.shape
f = h2 * 37 * Array_Ones((sz2[0], sz[1])) # boundary condition.
fbar = np.trapz(np.cos(v * (phii - PHI2)) * f, phiprime, 1).reshape(1, n4) # integrate over y
A = (alpha / k) * gbarbar + ((np.sqrt(2) * alpha) / k2) * (
1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2)))))) * fbar
return np.trapz(exp(-alpha * bm ** 2 * (T[0, i, j] - s)) * A, s)
最佳答案
另一个概念实现,进程产生进程(编辑:jit 测试):
import numpy as np
# better pickling
import pathos
from contextlib import closing
from numba import jit
#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class NoDaemonPool(pathos.multiprocessing.Pool):
def Process(self, *args, **kwds):
return NoDaemonProcess(*args, **kwds)
# matrix dimensions
x = 100 # i
y = 500 # j
NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned
SS = np.zeros([x, y], dtype=float)
@jit
def foo(i):
return (i*i + 1)
@jit
def bar(phii, j):
return phii*(j+1)
# The code which is implemented down here:
'''
for i in range(x):
phii = foo(i)
for j in range(y):
SS[i, j] = bar(phii, j)
'''
# Threaded version:
# queue is in global scope
def outer_loop(i):
phii = foo(i)
# i is in process scope
def inner_loop(j):
result = bar(phii,j)
# the data is coordinates and result
return (i, j, result)
with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
res = list(pool.imap(inner_loop, range(y)))
return res
with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
results = list(pool.imap(outer_loop, range(x)))
result_list = []
for r in results:
result_list += r
# read results from queue
for res in result_list:
if res:
i, j, val = res
SS[i,j] = val
# check that all cells filled
print(np.count_nonzero(SS)) # 100*500
编辑:解释。
这段代码中所有复杂性的原因是我想做比 OP 要求的更多的并行化。如果仅并行化内循环,则保留外循环,因此对于外循环的每次迭代,都会创建新的进程池并执行内循环的计算。只要在我看来,该公式不依赖于外循环的其他迭代,我决定并行化所有内容:现在外循环的计算被分配给池中的进程,之后每个“外循环”进程创建自己的新池,并产生额外的进程来执行内部循环的计算。
我可能错了,外循环不能并行化;在这种情况下,您只能保留内部进程池。
使用进程池可能不是最佳解决方案,因为创建和销毁池会消耗时间。更有效(但需要手动模式)的解决方案是一劳永逸地实例化 N 个进程,然后将数据输入它们并使用多处理 Queue() 接收结果。所以你应该首先测试这个多处理解决方案是否给你足够的加速(如果与
Z0_SteadyState
运行相比,构造和销毁池的时间很短,就会发生这种情况)。下一个复杂因素是人工无守护程序池。守护进程用于优雅地停止应用程序:当主程序退出时,守护进程会静默终止。但是,守护进程不能产生子进程。在您的示例中,您需要等到每个进程结束才能检索数据,因此我将它们设为非守护进程以允许生成子进程来计算内部循环。
数据交换:我认为与实际计算相比,填充矩阵所需的数据量和时间是很小的。所以我使用池和
pool.imap
功能(比 .map()
快一点。您也可以尝试 .imap_unordered()
,但在您的情况下它不应该有显着差异)。因此,内部池一直等到所有结果都被计算出来并将它们作为列表返回。因此,外部池返回必须连接的列表列表。然后在单个快速循环中根据这些结果重建矩阵。通知
with closing()
thing:在这条语句下的事情完成后,它会自动关闭池,避免僵尸进程消耗内存。此外,您可能会注意到我奇怪地在另一个函数中定义了一个函数,并且在进程内部我可以访问一些尚未传递到那里的变量:
i
, phii
.发生这种情况是因为进程可以访问使用 copy-on-change
启动它们的全局范围。策略(默认 fork
模式)。这不涉及酸洗并且速度很快。最后一条评论是关于使用
pathos
库而不是标准 multiprocessing
, concurrent.futures
, subprocess
等。原因是pathos
使用了更好的酸洗库,因此它可以序列化标准库不能序列化的函数(例如,lambda 函数)。我不知道你的功能,所以我使用了更强大的工具来避免进一步的问题。最后一件事:多处理与线程。您可以更改
pathos
处理池到标准 ThreadPoolExecutor
来自 concurrent.futures
,就像我刚开始编写该代码时所做的那样。但是,在执行期间,在我的系统上 CPU 仅加载 100%(即使用了一个内核,看起来所有 8 个内核都加载了 15-20%)。我不太擅长理解线程和进程之间的差异,但在我看来,进程允许利用所有内核(每个内核 100%,总共 800%)。
关于python - 如何使用多线程加速嵌套for循环计算?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52919039/