python - 在函数中执行循环多处理的最快方法?

标签 python multithreading parallel-processing multiprocessing python-asyncio

1. 我有一个函数var .我想知道通过利用系统拥有的所有处理器、内核、线程和 RAM 内存通过多处理/并行处理来快速运行此函数中的循环的最佳方法。

import numpy
from pysheds.grid import Grid

xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'

def var(interest):
    
    variable_avg = []
    for (x,y) in zip(xs,ys):
        grid = Grid.from_raster(interest, data_name='map')

        grid.catchment(data='map', x=x, y=y, out_name='catch')

        variable = grid.view('catch', nodata=np.nan)
        variable = numpy.array(variable)
        variablemean = (variable).mean()
        variable_avg.append(variablemean)
    return(variable_avg)

2. 如果我能同时运行这两个函数就太好了var并为给定的函数的多个参数并行循环。
例如:var(a)var(b)同时。因为它消耗的时间比单独并行循环要少得多。
如果没有意义,请忽略 2。

最佳答案

TLDR:
您可以使用多处理库来运行您的var并行功能。但是,正如所写,您可能没有对 var 进行足够的调用。多处理由于其开销而具有性能优势。如果您需要做的只是运行这两个调用,那么串行运行可能是您将获得的最快速度。但是,如果您需要调用很多电话,多处理可以帮助您。
我们需要使用进程池来并行运行它,线程不会在这里工作,因为 Python 的全局解释器锁会阻止我们实现真正的并行性。进程池的缺点是进程是重量级的。在仅对 var 进行两次调用的示例中创建池的时间超过了运行 var 所花费的时间本身。
为了说明这一点,让我们使用一个进程池并使用 asyncio 来运行对 var 的调用。并行并将其与仅按顺序运行事物进行比较。注意运行这个示例,我使用了来自 Pysheds 库 https://github.com/mdbartos/pysheds/tree/master/data 的图像- 如果您的图像更大,以下可能不成立。

import functools
import time
from concurrent.futures.process import ProcessPoolExecutor
import asyncio

a = 'diem.tif'
xs = 10, 20, 30, 40, 50
ys = 10, 20, 30, 40, 50

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    with ProcessPoolExecutor() as pool:
        task_one = loop.run_in_executor(pool, functools.partial(var, a))
        task_two = loop.run_in_executor(pool, functools.partial(var, a))
        results = await asyncio.gather(task_one, task_two)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time()

    result_one = var(a)
    result_two = var(a)

    serial_end = time.time()
    print(f'Running in serial took {serial_end - serial_start}')


if __name__ == "__main__":
    asyncio.run(main())
在我的机器(2.4 GHz 8 核 Intel Core i9)上运行上述内容,我得到以下输出:
Process pool took 1.7581260204315186
Running in serial took 0.32335805892944336
在此示例中,进程池的速度要慢五倍以上!这是由于创建和管理多个进程的开销。也就是说,如果您需要调用 var不止几次,进程池可能更有意义。让我们修改它以运行 var 100次并比较结果:
async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(loop.run_in_executor(pool, functools.partial(var, a)))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time()

    for _ in range(100):
        result = var(a)

    serial_end = time.time()
    print(f'Running in serial took {serial_end - serial_start}')
运行 100 次,我得到以下输出:
Process pool took 3.442288875579834
Running in serial took 13.769982099533081
在这种情况下,在进程池中运行大约快 4 倍。您可能还希望尝试同时运行循环的每个迭代。您可以通过创建一个函数来执行此操作,该函数一次处理一个 x,y 坐标,然后在进程池中运行要检查的每个点:
def process_poi(interest, x, y):
    grid = Grid.from_raster(interest, data_name='map')

    grid.catchment(data='map', x=x, y=y, out_name='catch')

    variable = grid.view('catch', nodata=np.nan)
    variable = np.array(variable)
    return variable.mean()

async def var_loop_async(interest, pool, loop):
    tasks = []
    for (x,y) in zip(xs,ys):
        function_call = functools.partial(process_poi, interest, x, y)
        tasks.append(loop.run_in_executor(pool, function_call))

    return await asyncio.gather(*tasks)

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(var_loop_async(a, pool, loop))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time() 
在这种情况下,我得到 Process pool took 3.2950568199157715 - 所以并不比我们的第一个版本快,每次调用 var 一个进程.这可能是因为此时的限制因素是我们的 CPU 上有多少可用内核,将我们的工作分成更小的增量不会增加太多值(value)。
也就是说,如果您希望跨两个图像检查 1000 个 x 和 y 坐标,那么最后一种方法可能会产生性能提升。

关于python - 在函数中执行循环多处理的最快方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63188571/

相关文章:

windows - 挂起一个线程比让它等待有什么好处吗?

c# - 使用 C# Cross Threading 设置标签值

c# - 并行处理一个密集的IO函数

python - 为什么Python网站上的一些代码示例不使用 `class classname(object):`约定?

python - Pandas : Raise error when a line is incomplete

python - 为什么 df.cumsum() 给出 ValueError : Wrong number of items passed, 放置意味着 1

java - 如何将线程分配给某种数据类型?

python - 列表:查找第一个索引并计算列表列表中特定列表的出现次数

c - MPI - 在障碍后尝试打印广播变量,显示其他一些变量

.net - .NET 4 并行扩展是否包含无锁生产者/消费者队列的实现?