java - 通过Spark覆盖HDFS文件/目录

标签 java apache-spark hdfs overwrite

问题

我在 HDFS 中保存了一个文件,我想做的就是运行我的 Spark 应用程序,计算结果 javaRDD 并使用 saveAsTextFile() 来存储HDFS 中的新"file"。

但是,如果文件已存在,Spark 的 saveAsTextFile() 将不起作用。它不会覆盖它。

我尝试过的

因此,我寻找了解决方案,发现一种可能的方法是通过 HDFS API 删除文件,然后再尝试保存新文件。

我添加了代码:

FileSystem hdfs = FileSystem.get(new Configuration());
Path newFolderPath = new Path("hdfs://node1:50050/hdfs/" +filename);

if(hdfs.exists(newFolderPath)){
    System.out.println("EXISTS");
    hdfs.delete(newFolderPath, true);
}

filerdd.saveAsTextFile("/hdfs/" + filename);

当我尝试运行 Spark 应用程序时,文件被删除,但我收到 FileNotFoundException

考虑到当有人尝试从路径读取文件并且该文件不存在时会发生此异常,这是没有意义的,因为删除文件后,没有代码尝试读取它。

我的代码的一部分

 JavaRDD<String> filerdd = sc.textFile("/hdfs/" + filename)    // load the file here
 ...
 ...
 // Transformations here
 filerdd = filerdd.map(....);
 ...
 ...

 // Delete old file here
 FileSystem hdfs = FileSystem.get(new Configuration());
 Path newFolderPath = new Path("hdfs://node1:50050/hdfs/" +filename);

 if(hdfs.exists(newFolderPath)){
    System.out.println("EXISTS");
    hdfs.delete(newFolderPath, true);
 }

 // Write new file here
 filerdd.saveAsTextFile("/hdfs/" + filename);

我试图在这里做最简单的事情,但我不知道为什么这不起作用。也许 filerdd 以某种方式连接到路径?

最佳答案

问题是您使用相同的路径进行输入和输出。 Spark的RDD会延迟执行。它在您调用 saveAsTextFile 时运行。此时,您已经删除了newFolderPath。所以 filerdd 会提示。

无论如何,您不应该使用相同的路径进行输入和输出。

关于java - 通过Spark覆盖HDFS文件/目录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37284156/

相关文章:

java - 如何将字符串数组转换为整数数组?

java - SQL查询错误导致异常

java - 使用ManyToMany关系时如何在Spring中获取对象外键列表?

scala - Apache Spark 抛出 java.lang.IllegalStateException : unread block data

java - OrientDB 注册 Hooks

hadoop - 如何阻止 HiveServer2 作为守护进程自动启动?

apache-spark - SPARK : one powerful machine Vs. 几台较小的机器

scala - 如何使 SparkSession 和 Spark SQL 隐式全局可用(在函数和对象中)?

java - 使用 java 反射运行时,HDFS Parquet 文件读取器抛出 DistributedFileSystem.class not found

hadoop - HDFS HA 可能性