apache-spark - 如何重命名在 Dataproc Serverless 上运行的 Spark 中的 GCS 文件?

标签 apache-spark hadoop google-cloud-dataproc google-cloud-dataproc-serverless

将 Spark 数据帧写入文件后,我尝试使用如下代码重命名该文件:

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val file = fs.globStatus(new Path(path + "/part*"))(0).getPath().getName()
fs.rename(new Path(path + "/" + file), new Path(path + "/" + fileName))

这在本地运行 Spark 效果很好...但是,当我在 Dataproc 上运行我的 jar 时,我收到如下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-***, in path: gs://prj-*****/part*, expected bucket: dataproc-temp-***

似乎文件在作业结束之前可能不会保存到目标存储桶,因此很难重命名它们。当我读到一些看起来很有希望的内容时,我尝试使用 .option("mapreduce.fileoutputcommitter.algorithm.version", "2")

更新: 还是没有运气。似乎 spark.sparkContext.hadoopConfiguration 期望基本存储桶是 dataproc-temp-* 存储桶。下面是完整的堆栈跟踪:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-**, in path: gs://p**, expected bucket: dataproc-temp-u***
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:95)
    at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:667)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:394)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:149)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1085)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1059)

最佳答案

FileSystem.get(...) 返回的 HCFS 实例调用与特定的 FS(在本例中为 GCS 存储桶)相关联。默认情况下,Dataproc Serverless Spark 配置为使用 gs://daptaproc-temp-*/通过 spark.hadoop.fs.defaultFS 将存储桶作为默认 HCFS Spark 属性。

要解决此问题,您需要使用 FileSystem#get(URI uri, Configuration conf) 创建 HCFS 实例调用:

val fs = FileSystem.get(path.toUri, spark.sparkContext.hadoopConfiguration)

关于apache-spark - 如何重命名在 Dataproc Serverless 上运行的 Spark 中的 GCS 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73042528/

相关文章:

python - 如何将 python 包传递给 spark 作业并使用参数从包中调用主文件

java - s3开始使用Apache Spark返回内容长度错误的结尾,同时在较旧的EC2实例上可以正常工作

Java 使用 Apache Spark 指定架构从 json 文件读取

scala - 使用 Spark 分析推特数据

hadoop - 使用 HCFS 读取 JSON 换行文件

apache-spark - 如何获得 Spark 作业以使用 Google Cloud DataProc 集群上的所有可用资源?

java - MongoSpark 保存重复键错误 E11000

unix - 使用 Unix 的 split|grep|cat 与 Hadoop 进行搜索

hadoop - mapreduce 的多表输入

apache-spark - 在 Cloud Dataproc 中调整工作程序节点大小后如何更新 spark 配置