pandas - 在 Pandas 中并发读取 HDF5 文件

标签 pandas concurrency pytables h5py

我有一个 data.h5 文件,它被组织成多个 block ,整个文件有几百GB。我需要以 Pandas DataFrame 的形式处理内存中经过过滤的文件子集。

以下例程的目标是将过滤工作分布在多个进程中,然后将过滤结果连接到最终的 DataFrame 中。

由于从文件中读取需要花费大量时间,因此我尝试让每个进程也以并发方式读取自己的 block 。

import multiprocessing as mp, pandas as pd

store = pd.HDFStore('data.h5')
min_dset, max_dset = 0, len(store.keys()) - 1
dset_list = list(range(min_dset, max_dset))

frames = []

def read_and_return_subset(dset):
    # each process is intended to read its own chunk in a concurrent manner
    chunk = store.select('batch_{:03}'.format(dset))

    # and then process the chunk, do the filtering, and return the result
    output = chunk[chunk.some_condition == True]
    return output


with mp.Pool(processes=32) as pool:
    for frame in pool.map(read_and_return_subset, dset_list):
        frames.append(frame)

df = pd.concat(frames)

但是,上面的代码会触发此错误:

HDF5ExtError                              Traceback (most recent call last)
<ipython-input-174-867671c5a58f> in <module>()
     53 
     54     with mp.Pool(processes=32) as pool:
---> 55         for frame in pool.map(read_and_return_subset, dset_list):
     56             frames.append(frame)
     57 

/usr/lib/python3.5/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    258         in a list that is returned.
    259         '''
--> 260         return self._map_async(func, iterable, mapstar, chunksize).get()
    261 
    262     def starmap(self, func, iterable, chunksize=None):

/usr/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609 
    610     def _set(self, i, obj):

HDF5ExtError: HDF5 error back trace

  File "H5Dio.c", line 173, in H5Dread
    can't read data
  File "H5Dio.c", line 554, in H5D__read
    can't read data
  File "H5Dchunk.c", line 1856, in H5D__chunk_read
    error looking up chunk address
  File "H5Dchunk.c", line 2441, in H5D__chunk_lookup
    can't query chunk address
  File "H5Dbtree.c", line 998, in H5D__btree_idx_get_addr
    can't get chunk info
  File "H5B.c", line 340, in H5B_find
    unable to load B-tree node
  File "H5AC.c", line 1262, in H5AC_protect
    H5C_protect() failed.
  File "H5C.c", line 3574, in H5C_protect
    can't load entry
  File "H5C.c", line 7954, in H5C_load_entry
    unable to load entry
  File "H5Bcache.c", line 143, in H5B__load
    wrong B-tree signature

End of HDF5 error back trace

Problems reading the array data.

似乎 Pandas/pyTables 在尝试以并发方式访问同一文件时遇到了麻烦,即使它只是为了读取。

有没有办法让每个进程同时读取自己的 block ?

最佳答案

IIUC 您可以对用于过滤数据的列建立索引(chunk.some_condition == True - 在示例代码中),然后仅读取满足所需条件的数据子集。

为了能够做到这一点,您需要:

  1. 将 HDF5 文件保存为 table 格式 - 使用参数:format='table'
  2. 索引列,将用于过滤 - 使用参数:data_columns=['col_name1', 'col_name2', etc.]

之后,您应该能够通过阅读来过滤数据:

store = pd.HDFStore(filename)
df = store.select('key_name', where="col1 in [11,13] & col2 == 'AAA'")

关于pandas - 在 Pandas 中并发读取 HDF5 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45722027/

相关文章:

python - 如何使用 `slice` 对象读取 pytables.CArray?

python-3.x - Anaconda3 libhdf5.so.9 : cannot open shared object file [works fine on py2. 7 但不在 py3.4 上]

python - 通过在 Pandas 中添加一些天数将年份转换为日期

python - HDF存储异常: cannot find the correct atom type : a basic case

concurrency - Clojure 的 future promise 取决于我

java - 我是否需要对不返回我关心的值的 Future 执行 future.get() ?

python - 根据条件从 DataFrame 中选择行

python - 尝试合并具有多种条件的 DataFrame

asp.net - 在 ASP.NET 中存储记录锁定 token

python - 从 HDFStore 检索多索引 Pandas DataFrame 时遇到问题(以表格式)