scala - 使用 Spark 从目录中读取多个文件

标签 scala hadoop apache-spark kaggle

我正在尝试解决这个问题 problem在 kaggle 使用 spark:

输入的层次结构是这样的:

drivers/{driver_id}/trip#.csv
e.g., drivers/1/1.csv
      drivers/1/2.csv
      drivers/2/1.csv

我想读取父目录"drivers",对于每个子目录,我想创建一个pairRDD,键为(sub_directory,file_name) 和值作为文件的内容

我检查了this链接并尝试使用

val text = sc.wholeTextFiles("drivers")
text.collect()

失败并出现错误:

java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:591)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:283)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:243)
    at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:884)

但是当我运行下面的代码时,它起作用了。

val text =  sc.wholeTextFiles("drivers/1")
text.collect()

但我不想这样做,因为在这里我必须读取目录 drivers 并循环文件并为每个条目调用 wholeTextFiles

最佳答案

而不是使用

sc.textfile("path/*/**") or sc.wholeTextFiles("path/*")

您可以使用这段代码。因为 spark 在内部列出了文件夹和子文件夹的所有可能值,所以它可能会花费您处理大型数据集的时间。取而代之的是,您可以将联合用于相同的目的。

将包含位置的列表对象传递给以下代码,注意:sc 是 SQLContext 的对象

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

现在你得到了最终的统一 RDD,即 df

关于scala - 使用 Spark 从目录中读取多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31051107/

相关文章:

hadoop - 访问 hdfs 文件夹

Hadoop 可以替代 SSIS、Informatica 等 ETL 工具吗?

hadoop - 在sqoop命令中获取预期的数值参数

apache-spark - 如何更改Spark ui端口?

scala - "this"关键字如何在 Scala 辅助构造函数中使用和工作?

java - 特性:Scala 和 Java 之间的互操作,从 Java 访问 val

scala - 将字符转换为字符串

scala - 基于docker容器的库以支持elastic4s

python - PySpark ML : OnevsRest strategy for LinearSVC

java - 远程连接Spark集群