python - 如何在 python 中将数据从单个 hdf5 文件安全地并行写入多个文件?

标签 python multiprocessing h5py

我正在尝试将我的数据(从 hdf5 格式的单个文件)写入多个文件,并且当任务以串行方式执行时它工作正常。现在想提高效率,使用multiprocessing模块修改代码,但是有时候输出会出错。这是我的代码的简化版本。

import multiprocessing as mp
import numpy as np
import math, h5py, time
N = 4  # number of processes to use
block_size = 300
data_sz = 678
dataFile = 'mydata.h5'

# fake some data
mydata = np.zeros((data_sz, 1))
for i in range(data_sz):
    mydata[i, 0] = i+1
h5file = h5py.File(dataFile, 'w')
h5file.create_dataset('train', data=mydata)

# fire multiple workers
pool = mp.Pool(processes=N)
total_part = int(math.ceil(1. * data_sz / block_size))
for i in range(total_part):
    pool.apply_async(data_write_func, args=(dataFile, i, ))
pool.close()
pool.join()

data_write_func() 的结构是:

def data_write_func(h5file_dir, i, block_size=block_size):
    hf = h5py.File(h5file_dir)
    fout = open('data_part_' + str(i), 'w')
    data_part = hf['train'][block_size*i : min(block_size*(i+1), data_sz)]  # np.ndarray
    for line in data_part:
        # do some processing, that takes a while...
        time.sleep(0.01)
        # then write out..
        fout.write(str(line[0]) + '\n')
    fout.close()

当我设置 N=1 时,效果很好。但是当我设置 N=2N=4 时,结果有时会变得困惑(不是每次!)。例如在 data_part_1 中,我希望输出为:

301,
302,
303,
...

但有时我得到的是

0,
0,
0,
...

有时我得到

379,
380,
381,
...

我是多处理模块的新手,觉得它很棘手。如有建议不胜感激!

最佳答案

按照 Andriy 的建议修复了 fout.writemydata=... 后,您的程序按预期运行,因为每个进程都写入自己的文件。这些进程不可能相互混合。

可能想做的是使用multiprocessing.map() 为你削减你的迭代(所以你不需要做block_size 东西),加上它保证结果按顺序完成。我修改了您的代码以使用多处理映射:

import multiprocessing
from functools import partial
import pprint

def data_write_func(line):
  i = multiprocessing.current_process()._identity[0]
  line = [i*2 for i in line]
  files[i-1].write(",".join((str(s) for s in line)) + "\n")

N = 4
mydata=[[x+1,x+2,x+3,x+4] for x in range(0,4000*N,4)] # fake some data
files = [open('data_part_'+str(i), 'w') for i in range(N)]

pool = multiprocessing.Pool(processes=N)
pool.map(data_write_func, mydata)
pool.close()
pool.join()

请注意:

  • i取自进程本身,不是1就是2
  • 因为现在 data_write_func 被每一行调用,文件打开需要在父进程中完成。另外:您不需要手动执行 close() 文件,操作系统会在您的 python 程序退出时为您执行此操作。

现在,我想您最终希望将所有输​​出放在一个 文件中,而不是放在单独的文件中。如果您的输出行在 Linux 上低于 4096 字节(或在 OSX 上低于 512 字节,对于其他操作系统,请参阅 here)您实际上可以安全地打开一个文件(在追加模式下)并让每个进程写入该文件文件,因为低于这些大小的写入由 Unix 保证是原子的。

更新:

"What if the data is stored in hdf5 file as dataset?"

根据 hdf5 doc this works out of the box since version 2.2.0 :

Parallel HDF5 is a configuration of the HDF5 library which lets you share open files across multiple parallel processes. It uses the MPI (Message Passing Interface) standard for interprocess communication

因此,如果您在代码中这样做:

h5file = h5py.File(dataFile, 'w')
dset = h5file.create_dataset('train', data=mydata)

然后您可以从您的进程中访问 dset 并读取/写入它,而无需采取任何额外措施。另见 this example from h5py using multiprocessing

关于python - 如何在 python 中将数据从单个 hdf5 文件安全地并行写入多个文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48057744/

相关文章:

python - 在命令前添加 "sudo"使命令未知

python - 让 Selenium 导航到由 Flask 托管的页面时遇到问题

python - JSON 序列化对象在多处理调用时出错 - TypeError : XXX objects not callable error

python - 多线程 Python 文件系统爬虫

python - 无法在 Windows 10 上安装 H5PY 库 - Python

python - PyQt5 根据输入值自动绘制

python-3.x - 是否可以对 tkinter 进行多进程处理?

python - 使用 h5py 访问数据范围

python - 使用 h5py 将外部原始文件链接到 hdf5 文件

python - Django 在外出进行 paypal 付款时保持登录丢失