我有一个 data.h5
文件,它被组织成多个 block ,整个文件有几百GB。我需要以 Pandas DataFrame 的形式处理内存中经过过滤的文件子集。
以下例程的目标是将过滤工作分布在多个进程中,然后将过滤结果连接到最终的 DataFrame 中。
由于从文件中读取需要花费大量时间,因此我尝试让每个进程也以并发方式读取自己的 block 。
import multiprocessing as mp, pandas as pd
store = pd.HDFStore('data.h5')
min_dset, max_dset = 0, len(store.keys()) - 1
dset_list = list(range(min_dset, max_dset))
frames = []
def read_and_return_subset(dset):
# each process is intended to read its own chunk in a concurrent manner
chunk = store.select('batch_{:03}'.format(dset))
# and then process the chunk, do the filtering, and return the result
output = chunk[chunk.some_condition == True]
return output
with mp.Pool(processes=32) as pool:
for frame in pool.map(read_and_return_subset, dset_list):
frames.append(frame)
df = pd.concat(frames)
但是,上面的代码会触发此错误:
HDF5ExtError Traceback (most recent call last)
<ipython-input-174-867671c5a58f> in <module>()
53
54 with mp.Pool(processes=32) as pool:
---> 55 for frame in pool.map(read_and_return_subset, dset_list):
56 frames.append(frame)
57
/usr/lib/python3.5/multiprocessing/pool.py in map(self, func, iterable, chunksize)
258 in a list that is returned.
259 '''
--> 260 return self._map_async(func, iterable, mapstar, chunksize).get()
261
262 def starmap(self, func, iterable, chunksize=None):
/usr/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
HDF5ExtError: HDF5 error back trace
File "H5Dio.c", line 173, in H5Dread
can't read data
File "H5Dio.c", line 554, in H5D__read
can't read data
File "H5Dchunk.c", line 1856, in H5D__chunk_read
error looking up chunk address
File "H5Dchunk.c", line 2441, in H5D__chunk_lookup
can't query chunk address
File "H5Dbtree.c", line 998, in H5D__btree_idx_get_addr
can't get chunk info
File "H5B.c", line 340, in H5B_find
unable to load B-tree node
File "H5AC.c", line 1262, in H5AC_protect
H5C_protect() failed.
File "H5C.c", line 3574, in H5C_protect
can't load entry
File "H5C.c", line 7954, in H5C_load_entry
unable to load entry
File "H5Bcache.c", line 143, in H5B__load
wrong B-tree signature
End of HDF5 error back trace
Problems reading the array data.
似乎 Pandas/pyTables 在尝试以并发方式访问同一文件时遇到了麻烦,即使它只是为了读取。
有没有办法让每个进程同时读取自己的 block ?
最佳答案
IIUC 您可以对用于过滤数据的列建立索引(chunk.some_condition == True
- 在示例代码中),然后仅读取满足所需条件的数据子集。
为了能够做到这一点,您需要:
- 将 HDF5 文件保存为
table
格式 - 使用参数:format='table'
- 索引列,将用于过滤 - 使用参数:
data_columns=['col_name1', 'col_name2', etc.]
之后,您应该能够通过阅读来过滤数据:
store = pd.HDFStore(filename)
df = store.select('key_name', where="col1 in [11,13] & col2 == 'AAA'")
关于pandas - 在 Pandas 中并发读取 HDF5 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45722027/