json - 如何重命名 pyspark 生成的 JSON?

标签 json apache-spark pyspark

当我用 编写 JSON 文件时

dataframe.coalesce(1).write.format('json')

在 pyspark 上,我无法更改分区中的文件名

我这样写 JSON:

dataframe.coalesce(1).write.format('json').mode('overwrite').save('path')

但我无法更改分区中的文件名

我想要这样的路径:

/folder/my_name.json

其中“my_name.json”是一个 json 文件

最佳答案

在 Spark we can't control name of the file 写入目录。

首先将数据写入 HDFS directory 那么为了更改文件名,我们需要使用 HDFS api

<强> Example:

<强> In Pyspark:

l=[("a",1)]
ll=["id","sa"]
df=spark.createDataFrame(l,ll)

hdfs_dir = "/folder/" #your hdfs directory
new_filename="my_name.json" #new filename

df.coalesce(1).write.format("json").mode("overwrite").save(hdfs_dir)

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

#list files in the directory

list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dir))

#filter name of the file starts with part-

file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

#rename the file

fs.rename(Path(hdfs_dir+''+file_name),Path(hdfs_dir+''+new_filename))

如果您想删除 success files 在目录中使用 fs.delete删除_Success文件。

<强> In Scala:

val df=Seq(("a",1)).toDF("id","sa")
df.show(false)

import org.apache.hadoop.fs._

val hdfs_dir = "/folder/"
val new_filename="new_json.json"

df.coalesce(1).write.mode("overwrite").format("json").save(hdfs_dir)

val fs=FileSystem.get(sc.hadoopConfiguration)
val f=fs.globStatus(new Path(s"${hdfs_dir}" + "*")).filter(x => x.getPath.getName.toString.startsWith("part-")).map(x => x.getPath.getName).mkString

fs.rename(new Path(s"${hdfs_dir}${f}"),new Path(s"${hdfs_dir}${new_filename}"))

fs.delete(new Path(s"${hdfs_dir}" + "_SUCCESS"))

关于json - 如何重命名 pyspark 生成的 JSON?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58171365/

相关文章:

python - 解析 json 中的代码 (Python)

android - 发生异常时的 Robospice 缓存

apache-spark - 替换默认情况下导入的库Spark的类路径

apache-spark - pyspark 预期构建 ClassDict 的参数为零(对于 pyspark.mllib.linalg.DenseVector)

java - 如何从 MultilayerPerceptronClassifier 获取分类概率?

apache-spark - 导入 Pyspark Delta Lake 模块时找不到模块错误

java - settHeader ("content type"、 "") - 困惑

c# - Javascript JSON仅序列化第一级

apache-spark - Spark 2.0如何处理列可为空性?

python - PySpark:具有多个功能的多列上的 Groupby