将 Spark 数据帧写入文件后,我尝试使用如下代码重命名该文件:
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val file = fs.globStatus(new Path(path + "/part*"))(0).getPath().getName()
fs.rename(new Path(path + "/" + file), new Path(path + "/" + fileName))
这在本地运行 Spark 效果很好...但是,当我在 Dataproc 上运行我的 jar 时,我收到如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-***, in path: gs://prj-*****/part*, expected bucket: dataproc-temp-***
似乎文件在作业结束之前可能不会保存到目标存储桶,因此很难重命名它们。当我读到一些看起来很有希望的内容时,我尝试使用 .option("mapreduce.fileoutputcommitter.algorithm.version", "2")
。
更新:
还是没有运气。似乎 spark.sparkContext.hadoopConfiguration
期望基本存储桶是 dataproc-temp-*
存储桶。下面是完整的堆栈跟踪:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-**, in path: gs://p**, expected bucket: dataproc-temp-u***
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:95)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:667)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:394)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:149)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1085)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1059)
最佳答案
FileSystem.get(...)
返回的 HCFS 实例调用与特定的 FS(在本例中为 GCS 存储桶)相关联。默认情况下,Dataproc Serverless Spark 配置为使用 gs://daptaproc-temp-*/
通过 spark.hadoop.fs.defaultFS
将存储桶作为默认 HCFS Spark 属性。
要解决此问题,您需要使用 FileSystem#get(URI uri, Configuration conf)
创建 HCFS 实例调用:
val fs = FileSystem.get(path.toUri, spark.sparkContext.hadoopConfiguration)
关于apache-spark - 如何重命名在 Dataproc Serverless 上运行的 Spark 中的 GCS 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73042528/