我正在尝试使用 SparkSession
从 HDFS 上的文件夹中读取几个 CSV 文件(即我不想读取该文件夹中的所有文件)
运行时出现以下错误(代码在最后):
Path does not exist:
file:/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv,
/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv
我不想在阅读时使用该模式,例如 /home/temp/*.csv
,原因是将来我有逻辑只选择文件夹中的一两个文件100 个 CSV 文件
请指教
SparkSession sparkSession = SparkSession
.builder()
.appName(SparkCSVProcessors.class.getName())
.master(master).getOrCreate();
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
Set<String> fileSet = Files.list(Paths.get("/home/cloudera/works/JavaKafkaSparkStream/input/"))
.filter(name -> name.toString().endsWith(".csv"))
.map(name -> name.toString())
.collect(Collectors.toSet());
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> rawDataset = sparkSession.read()
.option("inferSchema", "true")
.option("header", "true")
.format("com.databricks.spark.csv")
.option("delimiter", ",")
//.load(String.join(" , ", fileSet));
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, " +
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");
更新
我可以迭代文件并进行联合,如下所示。如果有更好的方法请推荐...
Dataset<Row> unifiedDataset = null;
for (String fileName : fileSet) {
Dataset<Row> tempDataset = sparkSession.read()
.option("inferSchema", "true")
.option("header", "true")
.format("csv")
.option("delimiter", ",")
.load(fileName);
if (unifiedDataset != null) {
unifiedDataset= unifiedDataset.unionAll(tempDataset);
} else {
unifiedDataset = tempDataset;
}
}
最佳答案
您的问题是您正在创建一个具有以下值的字符串:
"/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, /home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv"
而不是传递两个文件名作为参数,这应该通过以下方式完成:
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv",
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");
逗号必须位于字符串之外,并且您应该有两个值,而不是一个字符串。
关于apache-spark - SparkSession读取多个文件而不是使用模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45680356/