python - 在pyspark中加载大于内存的hdf5文件

标签 python apache-spark hdf5 pyspark

我有一个以 HDF5 格式存储的大文件(比如 20 Gb)。该文件基本上是一组随时间演变的 3D 坐标(分子模拟轨迹)。这基本上是一个形状数组 (8000 (frames), 50000 (particles), 3 (coordinates))

在常规 python 中,我会简单地加载 hdf5 数据文件,使用 h5pypytables 并索引数据文件,就像它是一个 numpy 一样(库会延迟加载它的任何数据需要)。

但是,如果我尝试使用 SparkContext.parallelize 在 Spark 中加载此文件,它显然会阻塞内存:

sc.parallelize(data, 10)

我该如何处理这个问题?大型数组是否有首选数据格式?我可以使 rdd 不经过内存而写入磁盘吗?

最佳答案

Spark(和 Hadoop)不支持读取部分 HDF5 二进制文件。 (我怀疑这是因为 HDF5 是一种用于存储文档的容器格式,它允许为文档指定树状层次结构)。

但是如果您需要从本地磁盘读取文件——使用 Spark 是可行的,尤其是当您知道 HDF5 文件的内部结构时。

这是一个 example - 它假定您将运行本地 spark 作业,并且您事先知道您的 HDF5 数据集“/mydata”由 100 个 block 组成。

h5file_path="/absolute/path/to/file"

def readchunk(v):
    empty = h5.File(h5file_path)
    return empty['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()

更进一步,您可以修改程序以使用 f5['/mydata'].shape[0] 检测 block 数

下一步是迭代多个数据集(您可以使用 f5.keys() 列出数据集)。

还有another article "From HDF5 Datasets to Apache Spark RDDs"描述了类似的方法。

同样的方法也适用于分布式集群,但效率不高。 h5py 要求文件位于本地文件系统中。因此,这可以通过多种方式实现:将文件复制到所有工作人员并将其保存在工作人员磁盘上的同一位置;或者将文件放入 HDFS 并使用 fusefs 挂载 HDFS - 这样工作人员就可以访问该文件。这两种方式都有一些低效,但对于临时任务来说应该足够好了。

这是优化后的版本,每个执行器只打开一次 h5:

h5file_path="/absolute/path/to/file"

_h5file = None    
def readchunk(v):
    # code below will be executed on executor - in another python process on remote server
    # original value for _h5file (None) is sent from driver
    # and on executor is updated to h5.File object when the `readchunk` is called for the first time
    global _h5file
    if _h5file is None:
         _h5file = h5.File(h5file_path)
    return _h5file['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()

关于python - 在pyspark中加载大于内存的hdf5文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31009951/

相关文章:

python - 提取 pandas 多索引数据帧的子集

python - 使用 Djoser、django-rest-framework-jwt 和 django-rest-framework 注册后获取 token

python - 在 Cloud9 中升级 SAM CLI

scala - Apache Spark,将 “CASE WHEN … ELSE …”计算列添加到现有DataFrame中

apache-spark - 如何设置SPARK_HOME变量?

hdf5 - 如何合并多个 .h5 文件?

python - 是否可以使用条件表达式进行多项赋值?

apache-spark - 如何将 Spark 实时流与另一个流在其整个生命周期中收集的所有数据一起加入?

python - 值错误: Invalid dataset identifier (invalid dataset identifier)

python - PyTables 和 HDF5 : Massive overhead for tree data