python - 使用 ProcessPoolExecutor 进行并行处理

标签 python process-pool

我有一个巨大的元素列表,必须以某种方式处理这些元素。 我知道可以通过多处理的 Process 来完成:

pr1 = Process(calculation_function, (args, ))
pr1.start()
pr1.join()

因此我可以创建 10 个进程并将参数按 10 分割传递给 args。然后工作就完成了。

但我不想手动创建它并手动计算它。相反,我想使用 ProcessPoolExecutor我这样做是这样的:

executor = ProcessPoolExecutor(max_workers=10)
executor.map(calculation, (list_to_process,))

计算是我完成这项工作的函数。

def calculation(list_to_process):
    for element in list_to_process:
        # .... doing the job

list_to_process 是我要处理的列表。

但是运行此代码后,循环迭代仅进行一次。 我以为

executor = ProcessPoolExecutor(max_workers=10)
executor.map(calculation, (list_to_process,))

与此相同 10 次:

pr1 = Process(calculation, (list_to_process, ))
pr1.start()
pr1.join()

但是好像是错误的。

如何通过ProcessPoolExecutor实现真正的多处理?

最佳答案

计算函数中删除for循环。现在您正在使用 ProcessPoolExecutor.mapmap() 调用您的循环,不同之处在于列表中的每个元素都是发送到不同的进程。例如

def calculation(item):
    print('[pid:%s] performing calculation on %s' % (os.getpid(), item))
    time.sleep(5)
    print('[pid:%s] done!' % os.getpid())
    return item ** 2

executor = ProcessPoolExecutor(max_workers=5)
list_to_process = range(10)
result = executor.map(calculation, list_to_process)

您将在终端中看到如下内容:

[pid:23988] performing calculation on 0
[pid:10360] performing calculation on 1
[pid:13348] performing calculation on 2
[pid:24032] performing calculation on 3
[pid:18028] performing calculation on 4
[pid:23988] done!
[pid:23988] performing calculation on 5
[pid:10360] done!
[pid:13348] done!
[pid:10360] performing calculation on 6
[pid:13348] performing calculation on 7
[pid:18028] done!
[pid:24032] done!
[pid:18028] performing calculation on 8
[pid:24032] performing calculation on 9
[pid:23988] done!
[pid:10360] done!
[pid:13348] done!
[pid:18028] done!
[pid:24032] done!

尽管事件的顺序实际上是随机的。返回值(至少在我的 Python 版本中)实际上是 itertools.chain出于某种原因反对。但这是一个实现细节。您可以将结果作为列表返回,例如:

>>> list(result)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

在示例代码中,您传递了一个单元素元组 (list_to_process,),这样只需将完整列表传递给一个进程即可。

关于python - 使用 ProcessPoolExecutor 进行并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46863932/

相关文章:

python - Pandas 将 'NA' 转换为 NaN

python - 无法弄清楚我的方法有什么问题

python - 尝试创建一个 View 以将 booleanfield 设置为 true 但不更新

python - 为什么 cv2 dilate 实际上不影响我的图像?

python - 为什么 ProcessPoolExecutor 和 Pool 会因 super() 调用而崩溃?

python - 如何从 multiprocessing.Pool.map 的worker_funtion内部为数组赋值?

python - 多处理池不适用于嵌套函数

python - 多进程池中apply_async的问题

python - 如何同时计算一个巨大文件中的词频?

python - Numpy where 可广播条件