1. 我有一个函数var
.我想知道通过利用系统拥有的所有处理器、内核、线程和 RAM 内存通过多处理/并行处理来快速运行此函数中的循环的最佳方法。
import numpy
from pysheds.grid import Grid
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
def var(interest):
variable_avg = []
for (x,y) in zip(xs,ys):
grid = Grid.from_raster(interest, data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch')
variable = grid.view('catch', nodata=np.nan)
variable = numpy.array(variable)
variablemean = (variable).mean()
variable_avg.append(variablemean)
return(variable_avg)
2. 如果我能同时运行这两个函数就太好了var
并为给定的函数的多个参数并行循环。例如:
var(a)
和 var(b)
同时。因为它消耗的时间比单独并行循环要少得多。如果没有意义,请忽略 2。
最佳答案
TLDR:
您可以使用多处理库来运行您的var
并行功能。但是,正如所写,您可能没有对 var
进行足够的调用。多处理由于其开销而具有性能优势。如果您需要做的只是运行这两个调用,那么串行运行可能是您将获得的最快速度。但是,如果您需要调用很多电话,多处理可以帮助您。
我们需要使用进程池来并行运行它,线程不会在这里工作,因为 Python 的全局解释器锁会阻止我们实现真正的并行性。进程池的缺点是进程是重量级的。在仅对 var
进行两次调用的示例中创建池的时间超过了运行 var
所花费的时间本身。
为了说明这一点,让我们使用一个进程池并使用 asyncio 来运行对 var
的调用。并行并将其与仅按顺序运行事物进行比较。注意运行这个示例,我使用了来自 Pysheds 库 https://github.com/mdbartos/pysheds/tree/master/data 的图像- 如果您的图像更大,以下可能不成立。
import functools
import time
from concurrent.futures.process import ProcessPoolExecutor
import asyncio
a = 'diem.tif'
xs = 10, 20, 30, 40, 50
ys = 10, 20, 30, 40, 50
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
with ProcessPoolExecutor() as pool:
task_one = loop.run_in_executor(pool, functools.partial(var, a))
task_two = loop.run_in_executor(pool, functools.partial(var, a))
results = await asyncio.gather(task_one, task_two)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
result_one = var(a)
result_two = var(a)
serial_end = time.time()
print(f'Running in serial took {serial_end - serial_start}')
if __name__ == "__main__":
asyncio.run(main())
在我的机器(2.4 GHz 8 核 Intel Core i9)上运行上述内容,我得到以下输出:Process pool took 1.7581260204315186
Running in serial took 0.32335805892944336
在此示例中,进程池的速度要慢五倍以上!这是由于创建和管理多个进程的开销。也就是说,如果您需要调用 var
不止几次,进程池可能更有意义。让我们修改它以运行 var
100次并比较结果:async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
tasks = []
with ProcessPoolExecutor() as pool:
for _ in range(100):
tasks.append(loop.run_in_executor(pool, functools.partial(var, a)))
results = await asyncio.gather(*tasks)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
for _ in range(100):
result = var(a)
serial_end = time.time()
print(f'Running in serial took {serial_end - serial_start}')
运行 100 次,我得到以下输出:Process pool took 3.442288875579834
Running in serial took 13.769982099533081
在这种情况下,在进程池中运行大约快 4 倍。您可能还希望尝试同时运行循环的每个迭代。您可以通过创建一个函数来执行此操作,该函数一次处理一个 x,y 坐标,然后在进程池中运行要检查的每个点:def process_poi(interest, x, y):
grid = Grid.from_raster(interest, data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch')
variable = grid.view('catch', nodata=np.nan)
variable = np.array(variable)
return variable.mean()
async def var_loop_async(interest, pool, loop):
tasks = []
for (x,y) in zip(xs,ys):
function_call = functools.partial(process_poi, interest, x, y)
tasks.append(loop.run_in_executor(pool, function_call))
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
tasks = []
with ProcessPoolExecutor() as pool:
for _ in range(100):
tasks.append(var_loop_async(a, pool, loop))
results = await asyncio.gather(*tasks)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
在这种情况下,我得到 Process pool took 3.2950568199157715
- 所以并不比我们的第一个版本快,每次调用 var
一个进程.这可能是因为此时的限制因素是我们的 CPU 上有多少可用内核,将我们的工作分成更小的增量不会增加太多值(value)。也就是说,如果您希望跨两个图像检查 1000 个 x 和 y 坐标,那么最后一种方法可能会产生性能提升。
关于python - 在函数中执行循环多处理的最快方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63188571/