这似乎是一个简单的问题,但我无法理解它。
我有一个在双 for 循环中运行并将结果写入 HDF 文件的模拟。该程序的一个简单版本如下所示:
import tables as pt
a = range(10)
b = range(5)
def Simulation():
hdf = pt.openFile('simulation.h5',mode='w')
for ii in a:
print(ii)
hdf.createGroup('/','A%s'%ii)
for i in b:
hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
hdf.close()
return
Simulation()
此代码完全符合我的要求,但由于该过程可能需要很长时间才能运行,所以我尝试使用多处理模块并使用以下代码:
import multiprocessing
import tables as pt
a = range(10)
b = range(5)
def Simulation(ii):
hdf = pt.openFile('simulation.h5',mode='w')
print(ii)
hdf.createGroup('/','A%s'%ii)
for i in b:
hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
hdf.close()
return
if __name__ == '__main__':
jobs = []
for ii in a:
p = multiprocessing.Process(target=Simulation, args=(ii,))
jobs.append(p)
p.start()
然而,这只会将最后一个模拟打印到 HDF 文件,它会以某种方式覆盖所有其他组。
最佳答案
每次以写入 (w
) 模式打开文件时,都会创建一个新文件——因此,如果文件内容已经存在,它的内容就会丢失。只有最后一个文件句柄才能成功写入文件。即使您将其更改为追加模式,您也不应尝试从多个进程写入同一个文件——如果两个进程同时尝试写入,输出将变得乱码。
相反,让所有工作进程将输出放入队列中,并让单个专用进程(子进程或主进程)处理队列中的输出并写入文件:
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None
def Simulation(inqueue, output):
for ii in iter(inqueue.get, sentinel):
output.put(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
def handle_output(output):
hdf = pt.openFile('simulation.h5', mode='w')
while True:
args = output.get()
if args:
method, args = args
getattr(hdf, method)(*args)
else:
break
hdf.close()
if __name__ == '__main__':
output = mp.Queue()
inqueue = mp.Queue()
jobs = []
proc = mp.Process(target=handle_output, args=(output, ))
proc.start()
for i in range(num_processes):
p = mp.Process(target=Simulation, args=(inqueue, output))
jobs.append(p)
p.start()
for i in range(num_simulations):
inqueue.put(i)
for i in range(num_processes):
# Send the sentinal to tell Simulation to end
inqueue.put(sentinel)
for p in jobs:
p.join()
output.put(None)
proc.join()
为了比较,这是一个使用 mp.Pool
的版本:
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
def Simulation(ii):
result = []
result.append(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
return result
def handle_output(result):
hdf = pt.openFile('simulation.h5', mode='a')
for args in result:
method, args = args
getattr(hdf, method)(*args)
hdf.close()
if __name__ == '__main__':
# clear the file
hdf = pt.openFile('simulation.h5', mode='w')
hdf.close()
pool = mp.Pool(num_processes)
for i in range(num_simulations):
pool.apply_async(Simulation, (i, ), callback=handle_output)
pool.close()
pool.join()
看起来更简单不是吗?但是,有一个显着差异。原始代码使用 output.put
将参数发送到在其自己的子进程中运行的 handle_output
。 handle_output
将从 output
队列中获取 args
并立即处理它们。使用上面的 Pool 代码,Simulation
在 result
中累积了一大堆 args
并且 result
不会发送到 handle_output
直到 Simulation
返回。
如果Simulation
需要很长时间,那么在simulation.h5
中没有任何内容写入时会有很长的等待时间。
关于python - 使用多处理将数据写入hdf文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15704010/