python - 使用多处理将数据写入hdf文件

标签 python multiprocessing hdf5

这似乎是一个简单的问题,但我无法理解它。

我有一个在双 for 循环中运行并将结果写入 HDF 文件的模拟。该程序的一个简单版本如下所示:

import tables as pt

a = range(10)
b = range(5)

def Simulation():
    hdf = pt.openFile('simulation.h5',mode='w')
    for ii in a:
        print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return
Simulation()

此代码完全符合我的要求,但由于该过程可能需要很长时间才能运行,所以我尝试使用多处理模块并使用以下代码:

import multiprocessing
import tables as pt

a = range(10)
b = range(5)

def Simulation(ii):
    hdf = pt.openFile('simulation.h5',mode='w')
    print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return

if __name__ == '__main__':
    jobs = []
    for ii in a:
        p = multiprocessing.Process(target=Simulation, args=(ii,))
        jobs.append(p)       
        p.start()

然而,这只会将最后一个模拟打印到 HDF 文件,它会以某种方式覆盖所有其他组。

最佳答案

每次以写入 (w) 模式打开文件时,都会创建一个新文件——因此,如果文件内容已经存在,它的内容就会丢失。只有最后一个文件句柄才能成功写入文件。即使您将其更改为追加模式,您也不应尝试从多个进程写入同一个文件——如果两个进程同时尝试写入,输出将变得乱码。

相反,让所有工作进程将输出放入队列中,并让单个专用进程(子进程或主进程)处理队列中的输出并写入文件:


import multiprocessing as mp
import tables as pt


num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None


def Simulation(inqueue, output):
    for ii in iter(inqueue.get, sentinel):
        output.put(('createGroup', ('/', 'A%s' % ii)))
        for i in range(num_arrays):
            output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))


def handle_output(output):
    hdf = pt.openFile('simulation.h5', mode='w')
    while True:
        args = output.get()
        if args:
            method, args = args
            getattr(hdf, method)(*args)
        else:
            break
    hdf.close()

if __name__ == '__main__':
    output = mp.Queue()
    inqueue = mp.Queue()
    jobs = []
    proc = mp.Process(target=handle_output, args=(output, ))
    proc.start()
    for i in range(num_processes):
        p = mp.Process(target=Simulation, args=(inqueue, output))
        jobs.append(p)
        p.start()
    for i in range(num_simulations):
        inqueue.put(i)
    for i in range(num_processes):
        # Send the sentinal to tell Simulation to end
        inqueue.put(sentinel)
    for p in jobs:
        p.join()
    output.put(None)
    proc.join()

为了比较,这是一个使用 mp.Pool 的版本:

import multiprocessing as mp
import tables as pt


num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000


def Simulation(ii):
    result = []
    result.append(('createGroup', ('/', 'A%s' % ii)))
    for i in range(num_arrays):
        result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
    return result


def handle_output(result):
    hdf = pt.openFile('simulation.h5', mode='a')
    for args in result:
        method, args = args
        getattr(hdf, method)(*args)
    hdf.close()


if __name__ == '__main__':
    # clear the file
    hdf = pt.openFile('simulation.h5', mode='w')
    hdf.close()
    pool = mp.Pool(num_processes)
    for i in range(num_simulations):
        pool.apply_async(Simulation, (i, ), callback=handle_output)
    pool.close()
    pool.join()

看起来更简单不是吗?但是,有一个显着差异。原始代码使用 output.put 将参数发送到在其自己的子进程中运行的 handle_outputhandle_output 将从 output 队列中获取 args 并立即处理它们。使用上面的 Pool 代码,Simulationresult 中累积了一大堆 args 并且 result 不会发送到 handle_output 直到 Simulation 返回。

如果Simulation 需要很长时间,那么在simulation.h5 中没有任何内容写入时会有很长的等待时间。

关于python - 使用多处理将数据写入hdf文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15704010/

相关文章:

python - 在 C++ 中保存数据,从 Python 加载 - 推荐的数据格式

python - 如何使用两个 'for' 枚举列表理解?

python - 使用 Python 重叠 2 个 RGBA 图像

python - 无法使用 python 的多处理 Pool.apply_async() 腌制 <type 'instancemethod'>

Python 未实现错误 : pool objects cannot be passed between processes

python - 在 HDF5 (PyTables) 中存储 numpy 稀疏矩阵

c++ - HDF5:将线性数组保存到三维数据集

Python tkinter.filedialog askfolder 干扰 clr

python - 按 dtype 子集 Pandas 数据框

python - 如何更改多处理共享数组大小?