我正在开发一个 Java 程序,专门用于在 HDFS 文件系统(位于 HDFS_IP
)上使用 Spark。
我的目标之一是检查 HDFS 上的路径 hdfs://HDFS_IP:HDFS_PORT/path/to/file.json
上是否存在文件。在本地调试我的程序时,我发现我无法使用以下代码访问此远程文件
private boolean existsOnHDFS(String path) {
Configuration conf = new Configuration();
FileSystem fs;
Boolean fileDoesExist = false ;
try {
fs = FileSystem.get(conf);
fileDoesExist = fs.exists(new Path(path)) ;
} catch (IOException e) {
e.printStackTrace();
}
return fileDoesExist ;
}
实际上,fs.exists
尝试在我的本地 FS 中查找文件 hdfs://HDFS_IP:HDFS_PORT/path/to/file.json
,而不是在HDFS。顺便说一句,让 hdfs://HDFS_IP:HDFS_PORT
前缀会使 fs.exists
崩溃,而抑制它则返回 false
因为 /path/to/file.json
本地不存在。
要使本地和从 Hadoop 集群执行 Java 程序时正常工作,fs
的适当配置是什么?
编辑:我最终放弃了并将错误修复传递给了我团队中的其他人。感谢那些试图帮助我的人!
最佳答案
问题是您向文件系统传递了一个空的conf文件。
您应该像这样创建文件系统:
FileSystem.get(spark.sparkContext().hadoopConfiguration());
当spark是SparkSession对象时。
正如你在FileSystem的代码中看到的:
/**
* Returns the configured filesystem implementation.
* @param conf the configuration to use
*/
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
/** Get the default filesystem URI from a configuration.
* @param conf the configuration to use
* @return the uri of the default filesystem
*/
public static URI getDefaultUri(Configuration conf) {
return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
}
它根据作为参数传递的配置创建 URI,当 DEFAULT_FS 为:时,它会查找键 FS_DEFAULT_NAME_KEY(fs.defaultFS):
public static final String FS_DEFAULT_NAME_DEFAULT = "file:///";
关于java - 从本地spark-submit检查远程HDFS上是否存在文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61205691/