我在集群中工作。我需要为 HDFS 中包含的每个文本文件运行相同的 spark 操作。但是我想在不从 shell 命令行为每个文件提交 spark job shell-command 的情况下这样做,因为文件数是 90。 我该怎么做?
我的一个文件的代码结构如下:
object SparkGraphGen{
def main(args: Array[String]){
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("dataset")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val peopleRDD = sc.textFile("file1.csv")
...
do stuff
...
sc.stop()
}}
最佳答案
更新:
foreach
循环怎么样:val sc = new SparkContext(conf) //val files = new File("Data\\files\\").listFiles.map(_.getAbsolutePath).toList val files = new File("Data\\files\\").listFiles.map(_.getName).toList files.foreach { file => //val lines = sc.textFile(file) val lines = sc.textFile("Data\\files\\" + file) println("total lines in file " + file + " " + lines.count()) //do more stuf... for each file lines.saveAsTextFile("Data\\output\\" + file + "_output") } sc.stop()
输出:
total lines in file C:\Users\rpatel\workspaces\Spark\Data\files\file1.txt 4 total lines in file C:\Users\rpatel\workspaces\Spark\Data\files\file2.txt 4
你也可以在shell脚本中写同样的for循环
#!/bin/bash for file in $(hadoop fs -ls /hdfs/path/to/files/|awk -F '|' '{print $NF}') do #run spark for each file spark-submit <options> $file /path/output/$file done
或者一次性处理所有文件....
您可以将所有文件放在一个目录中,只将完整的目录路径传递给 spark 上下文,spark 将处理该目录中的所有文件:
val peopleRDD = sc.textFile("/path/to/csv_files/")
您还可以组合 RDD,例如:
val file1RDD = sc.textFile("file1.csv")
val file2RDD = sc.textFile("file2.csv")
val allFileRDD = file1RDD ++ file2RDD // ++ nRDD
关于scala - 如何使用循环在 Spark-Scala 的 HDFS 中迭代多个文本文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41770619/