scala - 如何使用 Spark 在 map() 中快速从 HDFS 读取文件

标签 scala apache-spark

我需要在每个 map() 中读取不同的文件,该文件在 HDFS 中

  val rdd=sc.parallelize(1 to 10000)
  val rdd2=rdd.map{x=>
    val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration())
    val path=new Path("/user/zhc/"+x+"/")
    val t=hdfs.listStatus(path)
    val in =hdfs.open(t(0).getPath)
    val reader = new BufferedReader(new InputStreamReader(in))
    var l=reader.readLine()
  }
 rdd2.count

我的问题是这段代码

val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration())

运行时间太长,每次 map() 都需要创建一个新的 FileSystem 值。我可以将这段代码放在 map() 函数之外,这样它就不必每次都创建 hdfs 了吗?或者如何在 map() 中快速读取文件?

我的代码在多台机器上运行。谢谢!

最佳答案

在您的情况下,我建议使用 wholeTextFiles 方法,该方法将返回 pairRdd,键是文件完整路径,值是文件的字符串内容。

val filesPariRDD = sc.wholeTextFiles("hdfs://ITS-Hadoop10:9000/")
val filesLineCount = filesPariRDD.map( x => (x._1, x._2.length ) ) //this will return a map of fileName , number of lines of each file. You could apply any other function on the file contents
filesLineCount.collect() 

编辑

如果您的文件位于同一目录下的目录中(如评论中所述),您可以使用某种正则表达式

val filesPariRDD = sc.wholeTextFiles("hdfs://ITS-Hadoop10:9000/*/")

希望这是清楚和有用的

关于scala - 如何使用 Spark 在 map() 中快速从 HDFS 读取文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37108980/

相关文章:

apache-spark - 无法使用 pyspark 写入 hdfs

java - Spark : Ignoring or handling DataSet select errors

apache-spark - 为什么 Spark streaming 创建具有 0 个事件的批处理?

scala - 是否有可能克服 akka 接收中的类型删除?

Scala 2.10 - 编译器错误?

scala - Scala Spark 中未调用 RDD 的 Map 函数

javascript - 在 play 框架中将 javascript 变量转换为 scala

scala - 在 play/scala 中以 post 方法发送多部分表单数据

apache-spark - 应该如何配置spark sql来访问hive Metastore?

scala - 我有一个以 Map 作为列数据类型的表,如何将其分解以生成 2 列,一列用于映射,一列用于键?