python - 如何使用 mpi4py 并行化此 python 脚本?

标签 python parallel-processing mpi4py

如果有人问过这个问题,我深表歉意,但我已经阅读了一大堆文档,但仍然不确定如何做我想做的事情。

我想同时在多个内核上运行 Python 脚本。

我在一个目录中有 1800 个 .h5 文件,名称为“snapshots_s1.h5”、“snapshots_s2.h5”等,每个文件的大小约为 30MB。这个 Python 脚本:

  1. 从目录中一次读取一个 h5py 文件。
  2. 提取和操作 h5py 文件中的数据。
  3. 创建提取数据的图表。

一旦完成,脚本就会从目录中读取下一个 h5py 文件并遵循相同的过程。因此,在执行这项工作时,没有一个处理器需要与任何其他处理器通信。

脚本如下:

import h5py
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import cmocean
import os  

from mpi4py import MPI

de.logging_setup.rootlogger.setLevel('ERROR')

# Plot writes

count = 1
for filename in os.listdir('directory'):  ### [PERF] Applied to ~ 1800 .h5 files
    with h5py.File('directory/{}'.format(filename),'r') as file:

         ### Manipulate 'filename' data.  ### [PERF] Each fileI ~ 0.03 TB in size
         ...

         ### Plot 'filename' data.        ### [PERF] Some fileO is output here
         ...
count = count + 1

理想情况下,我想使用 mpi4py 来执行此操作(出于各种原因),尽管我对其他选项持开放态度,例如 multiprocessing.Pool(我实际上无法开始工作。我尝试按照 here 概述的方法进行操作).

所以,我的问题是:我需要在脚本中放入哪些命令才能使用 mpi4py 对其进行并行处理?或者,如果此选项不可行,我还能如何并行化脚本?

最佳答案

您应该使用 multiprocessingJavier 示例应该可行,但我想对其进行分解,以便您也能理解这些步骤。

通常,在使用池时,您会创建一个进程池,该进程空闲直到您将一些工作传递给它们。理想的实现方式是创建一个函数,每个进程将单独执行。

def worker(fn):
    with h5py.File(fn, 'r') as f:
        # process data..
        return result

就这么简单。每个进程都会运行这个,并将结果返回给父进程。

现在您有了执行工作的 worker 函数,让我们为它创建输入数据。它需要一个文件名,所以我们需要一个所有文件的列表

full_fns = [os.path.join('directory', filename) for filename in 
            os.listdir('directory')]

接下来初始化进程池。

import multiprocessing as mp
pool = mp.Pool(4)  # pass the amount of processes you want
results = pool.map(worker, full_fns)  

# pool takes a worker function and input data
# you usually need to wait for all the subprocesses done their work before 
using the data; so you don't work on partial data.

pool.join()
poo.close()

现在您可以通过results 访问您的数据。

for r in results:
    print r

在评论中让我知道这对你有何影响

关于python - 如何使用 mpi4py 并行化此 python 脚本?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46728380/

相关文章:

ipython - 如何在 IPython 笔记本中使用 mpi4py?

Python:使用 "with"动态定义新函数

类中的 Python 装饰器

multithreading - 何时使用 Test&Set 或 Test&Test&Set?

java - apache-arrow 是否可以使用 Java API 在单独的线程中创建 vector 的一部分?

python - 使用 mpi4py 在脚本中调用子进程

python - Anaconda 导入 mpi4py 但不导入 mpi

python - 使用 python imap 和电子邮件包获取电子邮件的正文

python - 使用 django-filter 按名字或姓氏过滤

python - multiprocessing.Queue 的管道损坏错误