scala - 如何使用循环在 Spark-Scala 的 HDFS 中迭代多个文本文件?

标签 scala hadoop apache-spark hdfs

我在集群中工作。我需要为 HDFS 中包含的每个文本文件运行相同的 spark 操作。但是我想在不从 shell 命令行为每个文件提交 spark job shell-command 的情况下这样做,因为文件数是 90。 我该怎么做?

我的一个文件的代码结构如下:

object SparkGraphGen{
def main(args: Array[String]){
      val conf = new SparkConf()
                .setMaster("yarn")
                .setAppName("dataset")
      val sc = new SparkContext(conf)
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      val peopleRDD = sc.textFile("file1.csv")
      ...
      do stuff
      ...
      sc.stop()
      }}

最佳答案

更新:

  1. foreach 循环怎么样:

    val sc = new SparkContext(conf)
    //val files = new File("Data\\files\\").listFiles.map(_.getAbsolutePath).toList 
    val files = new File("Data\\files\\").listFiles.map(_.getName).toList           
    files.foreach { file =>  
        //val lines = sc.textFile(file)
        val lines = sc.textFile("Data\\files\\" + file)
        println("total lines in file " + file + "  " + lines.count())   
        //do more stuf... for each file
        lines.saveAsTextFile("Data\\output\\" + file + "_output")
            }   
    sc.stop()
    

    输出:

    total lines in file C:\Users\rpatel\workspaces\Spark\Data\files\file1.txt  4
    total lines in file C:\Users\rpatel\workspaces\Spark\Data\files\file2.txt  4
    

  1. 你也可以在shell脚本中写同样的for循环

    #!/bin/bash
    
    for file in $(hadoop fs -ls /hdfs/path/to/files/|awk -F '|' '{print $NF}')
    do
      #run spark for each file
      spark-submit <options> $file /path/output/$file
    done
    

或者一次性处理所有文件....

您可以将所有文件放在一个目录中,只将完整的目录路径传递给 spark 上下文,spark 将处理该目录中的所有文件:

val peopleRDD = sc.textFile("/path/to/csv_files/")

您还可以组合 RDD,例如:

    val file1RDD = sc.textFile("file1.csv") 
    val file2RDD = sc.textFile("file2.csv")
    val allFileRDD = file1RDD ++ file2RDD // ++ nRDD

但是对于 90 个文件,我会将所有文件放在一个目录中并使用目录路径在一个作业中处理所有文件... Test

关于scala - 如何使用循环在 Spark-Scala 的 HDFS 中迭代多个文本文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41770619/

相关文章:

scala - 在scala中的另一个线程上执行一个简单的任务

scala - 如何将解析器与不同类型的 Elem 结合使用

java - JavaSparkContext.wholeTextFiles 的数据集 API 模拟

scala - 带数组的 Spark 塞

scala - DataFrame 到 RDD[(String, String)] 的转换

Scala语法如何创建嵌套案例类的实例

scala - 使用 FIRRTL 注释连接多位线和引脚

hadoop - Hive 返回非特定错误 : FAILED: SemanticException java. lang.reflect.UndeclaredThrowableException

hadoop - 如何使用 MultipleTextOutputFormat 类将默认输出文件重命名为一些有意义的名称?

Hadoop:无法在 core-site.xml 中将默认文件系统设置为 HDFS