scala - 如何列出 Spark Scala shell 中 HDFS 位置中的所有 csv 文件?

标签 scala hadoop apache-spark hdfs

这样做的目的是为了在 HDFS 的第二个位置操作和保存每个数据文件的副本。我会用

RddName.coalesce(1).saveAsTextFile(pathName)

将结果保存到HDFS。

这就是为什么我想单独处理每个文件,尽管我确信性能不会那么高效。但是,我还没有确定如何将 CSV 文件路径列表存储到字符串数组中,然后使用单独的 RDD 遍历每个路径。

让我们使用以下匿名示例作为 HDFS 源位置:

/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv

我知道如何使用 Hadoop FS Shell 列出文件路径:

HDFS DFS -ls /data/email/click/*/*.csv

我知道如何为所有数据创建一个 RDD:

val sentRdd = sc.textFile( "/data/email/click/*/*.csv" )

最佳答案

我还没有对它进行彻底的测试,但像这样的东西似乎有效:

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.fs.{FileSystem, Path, LocatedFileStatus, RemoteIterator}
import java.net.URI

val path: String = ???

val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hconf)
val iter = hdfs.listFiles(new Path(path), false)

def listFiles(iter: RemoteIterator[LocatedFileStatus]) = {
  def go(iter: RemoteIterator[LocatedFileStatus], acc: List[URI]): List[URI] = {
    if (iter.hasNext) {
      val uri = iter.next.getPath.toUri
      go(iter, uri :: acc)
    } else {
      acc
    }
  }
  go(iter, List.empty[java.net.URI])
}

listFiles(iter).filter(_.toString.endsWith(".csv"))

关于scala - 如何列出 Spark Scala shell 中 HDFS 位置中的所有 csv 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32771089/

相关文章:

sql - Spark SQL分组: Add to group by or wrap in first() if you don't care which value you get.;

java - 使用 Spark 通过 where 子句读取 HBase 表

scala - Salat:如何调试堆栈跟踪以了解案例类中的哪个字段导致异常

scala - Scala 如何忽略 Java 的已检查异常?

java - 使用两个输入文件进行映射缩减,其中一个文件基于另一个文件进行处理

apache-spark - 如何在带有分隔符| @ |的spark sql中使用Split函数?

hadoop - 如何在不使用 Hadoop 的情况下读取 HDFS 上的 Snappy 压缩文件?

scala - NoClassDefFoundError:使用Spark读取s3数据时出现org/apache/hadoop/fs/StreamCapabilities

Scala 库可用于编译和测试配置

java - 如何使用spark和java在mysql中插入模型