python - python脚本中的多处理函数

标签 python multiprocessing gis arcpy

我正在尝试在脚本中的函数上使用多重处理,我认为这是我的多重处理结构和语法的问题。这里的目标是循环遍历列表并从十三个不同的空间地理数据库复制出栅格/tiff 图像。输出和输入都保存在单独的地理数据库中,以加快进程并防止崩溃。这是我的示例脚本:

import arcpy, time, os
from arcpy import env
import multiprocessing
from multiprocessing import pool

def copy_rasters_3b(P6_GDBs, x, c):
        mosaic_gdb = os.path.join('{}/mosaic_{}.gdb/{}'.format(P6_GDBs, x, c))
        final_gdb = os.path.join('{}/final_{}.gdb/{}'.format(P6_GDBs, x, c))
        final_tiff = os.path.join('{}/{}.tif'.format(P6_GDBs, c))
        print "---Copying Rasters Started---"
        start_time = time.time()
        arcpy.CopyRaster_management(mosaic_gdb, final_gdb, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "", "NONE")
        arcpy.CopyRaster_management(mosaic_gdb, final_tiff, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "TIFF", "NONE")
        print("--- "+ c + " Complete %s seconds ---" % (time.time() - start_time))

### Main ###
def main():
    P6_DIR= "D:/P6_SecondRun"
    P6_GDBs= "D:/P6_GDBs"
    Categories =['CRP', 'FORE', 'INR', 'IR', 'MO', 'PAS', 'TCI', 'TCT', 'TG', 'WAT', 'WLF', 'WLO', 'WLT']
    rasters = defaultdict(list)

    # Environments
    arcpy.env.overwriteOutput = True
    arcpy.env.snapRaster = "D:/A__Snap/Phase6_Snap.tif"
    arcpy.env.outputCoordinateSystem = arcpy.SpatialReference(102039) #arcpy.SpatialReference(3857)

    pool = multiprocessing.Pool(processes=5)

    for c, x in zip(Categories, range(1,14)):
        pool.map(copy_rasters_3b(P6_GDBs, x, c), 14)
        pool.close()

############### EXECUTE MAIN() ###############
if __name__ == "__main__":
    main()

此脚本在后台启动五个进程,但在前两个实例后最终失败。我正在运行 ArcMap 10.4 x64 并使用 Python27-64 位。

最佳答案

首先,您应该使用一个函数和一个为该函数提供参数的迭代来调用pool.map(一次)。这样做的原因是实际的函数调用是在子进程中进行的。相反,您实际上是直接调用该函数并将其结果传递给 pool.map ,这是没有用的。

其次,您在循环内调用 pool.close,然后在下一次迭代中再次调用 pool.map。池对象的模式是将所有作业提交给它。然后在完成后调用 pool.close 等待所有作业完成。在这种情况下,所有作业都将通过单个 .map 方法调用来分配给它。

您的代码可能如下所示:

def copy_rasters_3b(arg_tuple):
    P6_GDBs, x, c = arg_tuple    # Break apart argument tuple
    ... # Rest same as before

pool = multiprocessing.Pool(processes=5)
pool.map(copy_rasters_3b, [(P6_GDBs, x, c) for x, c in enumerate(Categories)])
pool.close()

分解列表理解:Categories 是一个字符串列表,因此 enumerate(Categories) 生成一组编号的元组(N, member) 其中 N 是列表中的位置,member 是列表中的值。我们将这两者与每个结果元组中的常量值 P6_GDBs 组合起来。理解的最终结果是一个三元组列表。

map 将在每次调用时从其可迭代参数向函数提供单个元素。然而,该单个元素是三元组,因此这里的函数需要将一个参数分解为其组成部分。

最后,您应该删除以下导入行,因为它令人困惑且不必要。在这里,您导入名称 pool,但稍后(无需使用它),通过将新值分配给 pool 作为变量名称来覆盖它:

from multiprocessing import pool
[...]
pool = multiprocessing.Pool(processes=5)

关于python - python脚本中的多处理函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41746042/

相关文章:

python - socket __exit__ 在 python 中关闭吗?

javascript - 检测交叉点 Turf.js,第一个和最后一个位置不相等

sql-server - Geoserver 无法识别空间列

python - Bokeh map 情节。纬度/经度到 x 和 y

python - 如何将时区偏移量添加到 pandas datetime?

python - 按总值(value)对堆积柱形图进行排序 - Python

python - Python for 循环中的多个索引迭代

python - 为什么 get() 在多处理中速度慢?

python - 在 gunicorn workers 之间共享一把锁

python - 将 asyncio 与多处理结合起来会出现什么样的问题(如果有的话)?