apache-spark - SparkSession读取多个文件而不是使用模式

标签 apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

我正在尝试使用 SparkSession 从 HDFS 上的文件夹中读取几个 CSV 文件(即我不想读取该文件夹中的所有文件)

运行时出现以下错误(代码在最后):

Path does not exist:
file:/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv,
/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv

我不想在阅读时使用该模式,例如 /home/temp/*.csv,原因是将来我有逻辑只选择文件夹中的一两个文件100 个 CSV 文件

请指教

    SparkSession sparkSession = SparkSession
            .builder()
            .appName(SparkCSVProcessors.class.getName())
            .master(master).getOrCreate();
    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");

    Set<String> fileSet = Files.list(Paths.get("/home/cloudera/works/JavaKafkaSparkStream/input/"))
            .filter(name -> name.toString().endsWith(".csv"))
            .map(name -> name.toString())
            .collect(Collectors.toSet());

    SQLContext sqlCtx = sparkSession.sqlContext();

    Dataset<Row> rawDataset = sparkSession.read()
            .option("inferSchema", "true")
            .option("header", "true")
            .format("com.databricks.spark.csv")
            .option("delimiter", ",")
            //.load(String.join(" , ", fileSet));
            .load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, " +
                    "/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");

更新

我可以迭代文件并进行联合,如下所示。如果有更好的方法请推荐...

    Dataset<Row> unifiedDataset = null;

    for (String fileName : fileSet) {
        Dataset<Row> tempDataset = sparkSession.read()
                .option("inferSchema", "true")
                .option("header", "true")
                .format("csv")
                .option("delimiter", ",")
                .load(fileName);
        if (unifiedDataset != null) {
            unifiedDataset= unifiedDataset.unionAll(tempDataset);
        } else {
            unifiedDataset = tempDataset;
        }
    }

最佳答案

您的问题是您正在创建一个具有以下值的字符串:

"/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, /home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv"

而不是传递两个文件名作为参数,这应该通过以下方式完成:

.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv",
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");

逗号必须位于字符串之外,并且您应该有两个值,而不是一个字符串。

关于apache-spark - SparkSession读取多个文件而不是使用模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45680356/

相关文章:

java - 是否可以使用 Spark Streaming 流式传输数据库表数据

python - 来自执行程序的 PySpark 日志记录

python - 如何从列表中选择多个不连续的列到python中的另一个数据框中

apache-spark - Spark 数据集 - 内部连接问题

apache-spark - 将 parquet 读入 spark 数据集忽略缺失的字段

scala - 如何从 Spark SQL DataFrame 中的 MapType 列获取键和值

apache-spark - 使用 deltalake python (不是 Spark)更新插入

python-3.x - 设置 Spark 配置

apache-spark - 从 spark 读取使用 CTE(With 子句)创建的 Hive View

scala - Spark获取列中数组中具有相同值的所有行