scala - 如何使用spark sc.textFile获取文件名?

标签 scala apache-spark

我正在使用以下代码读取文件目录:

val data = sc.textFile("/mySource/dir1/*")

现在我的data rdd包含目录中所有文件的所有行(对吗?)

我现在想在每一行中添加一个包含源文件名的列,我该怎么做?

我尝试的其他选项是使用 WholeTextFile,但我不断出现内存不足的异常。 5台服务器24核24GB(执行器核心5执行器内存5G) 有什么想法吗?

最佳答案

您可以使用此代码。我已经用 Spark 1.4 和 1.5 对其进行了测试。

它从inputSplit获取文件名,并使用NewHadoopRDD的mapPartitionsWithInputSplit使用iterator将其添加到每一行

import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

val sc = new SparkContext(new SparkConf().setMaster("local"))

val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]

val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
           .mapPartitionsWithInputSplit((inputSplit, iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map(tup => (file.getPath, tup._2))
  }
)

linesWithFileNames.foreach(println)

关于scala - 如何使用spark sc.textFile获取文件名?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34316262/

相关文章:

ssh - 从 EC2 实例 : ssh connection to host refused 中调用 spark-ec2

hadoop - Spark 和 HCatalog?

apache-spark - Spark-SQL : Access array elements storing within a cell in a data frame

scala - 如何在 play 2.1 中处理 JSON 解析中的可选字段

scala - Play 2.1-RC1 反向路由未编译

scala - 如何从代码向 Spark 提交作业?

java - 根据java中的时间戳按月对spark数据集进行分组

java - Java 中的 Spark 在进行 join 或 groupWith 时如何比较两个键?

scala - 将 RDD[T] 过滤为类型 T 的子类

for 构造中的 Scala 模式匹配