apache-spark - 在 Databricks 集群中使用 hdf 文件

标签 apache-spark pyspark databricks cluster-computing hdf

我正在尝试在 Databricks 环境中创建一个简单的 .hdf。我可以在驱动程序上创建文件,但是当使用 rdd.map() 执行相同的代码时,它会抛出以下异常。

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 287.0 failed 4 times, most recent failure: Lost task 3.3 in stage 287.0 (TID 1080) (10.67.238.26 executor 1): org.apache.spark.api.python.PythonException: **'RuntimeError: Can't decrement id ref count (file write failed:** time = Tue Nov  1 11:38:44 2022
, filename = '/dbfs/mnt/demo.hdf', file descriptor = 7, errno = 95, error message = '**Operation not supported**', buf = 0x30ab998, total write size = 40, bytes this sub-write = 40, bytes actually written = 18446744073709551615, offset = 0)', from <command-1134257524229717>, line 13. Full traceback below:

我可以在 worker 上写入相同的文件,然后将文件复制回/dbfs/mnt。但是,我一直在寻找一种方法,通过它我可以直接通过工作节点写入/修改存储在 dbfs/mnt locaction 中的 .hdf 文件。

def create_hdf_file_tmp(x):
    import numpy as np
    import h5py, os, subprocess
    import pandas as pd
 
    dummy_data = [1,2,3,4,5]
    df_data = pd.DataFrame(dummy_data, columns=['Numbers'])

    with h5py.File('/dbfs/mnt/demo.hdf', 'w') as f:
        dset = f.create_dataset('default', data = df_data) # write to .hdf file
    
    return True

def read_hdf_file(file_name, dname):
    import numpy as np
    import h5py, os, subprocess
    import pandas as pd

    with h5py.File(file_name, 'r') as f:
        data = f[dname]
        print(data[:5])    
    return data


#driver code
rdd = spark.sparkContext.parallelize(['/dbfs/mnt/demo.hdf'])
result = rdd.map(lambda x: create_hdf_file_tmp(x)).collect()

以上是我尝试在具有 1 个驱动程序和 2 个工作节点的 Databricsk 笔记本中运行的最小代码。

最佳答案

根据错误消息:Operation not supported,最有可能的是,在写入 HDF 文件时,API 使用了 DBFS 不支持的随机写入之类的东西(请参阅 DBFS Local API limitations in the docs)。您需要将文件写入本地磁盘,然后将该文件移动到 DBFS 挂载。但它只会在驱动程序节点上工作......

关于apache-spark - 在 Databricks 集群中使用 hdf 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74275771/

相关文章:

apache-spark - Spark streaming - reduceByKeyAndWindow() 是否使用常量内存?

java - 无法使用 Java Spark API 解析文件

python - Pyspark 增加了几秒的时间

apache-spark - 使用 PySpark 将行收集为分组后的 Spark 数据帧数组

python - Pyspark:从密集向量列中获取新列中每一行的最大预测值

amazon-web-services - 使用 CLI 将逗号分隔的参数传递给 AWS EMR 中的 spark jar

python - 如何在 pyspark 中并行下载大量 URL 列表?

python - 如何在databricks集群上安装gdal?

json - 使用架构中的所有键(包括空列)将 spark 数据集写入 json

scala - Spark utf 8错误,非英文数据变成 `??????????`