scala - 循环遍历文件行并通过 Spark 中的每次迭代执行函数

标签 scala hadoop apache-spark hdfs

我在 HDFS 中有一个名为 file1 的文件,其中包含以下几行:(每一行都是一个目录路径)

this/is/path1
this/is/path2
this/is/path3
.
.
.
this/is/path1000ormore

我有一个 Scala Spark 函数如下:

val resultset=sc.hadoopFile(inputpath,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}

我想传递“file1”中的每一行来代替 hadoopFile 函数中的“inputpath”(需要是一个字符串),并获取每次迭代/循环的结果。 我该怎么做?

额外信息:

函数的实际作用: 上面的函数从指定的目录路径中获取第一个文件来代替“inputpath”,并给出文件的第一行。我想对存储在“file1”中的所有目录路径执行此操作,因此我正在寻找有关如何在循环/迭代中执行此操作的解决方案。

更新: 我试着把它放在这样的循环中:

val lines=Source.fromFile("/path/to/file1.txt").getLines.toList
for(i<-lines){
val firstLines=sc.hadoopFile(i,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}

这运行了大约 10 分钟(文件 1 包含大约 34,000 行)并且没有出现任何错误。但是当我尝试使用以下命令查看几行输出时,

firstLines.take(3)

我收到一条错误消息:

error: not found: value firstLines
          firstLines
          ^

所以我认为循环没有成功运行,因此 firstLines 从未被创建,尽管我不知道问题可能是什么。有人可以提供解决方案吗?

最佳答案

可以分两步实现:

  1. 像往常一样从 HDFS 读取“file1”文本文件,获取所有元素;
  2. 对于 1) 中的每一项,应用“结果集”逻辑。

另外 2) 可以改进:1) 中的所有项目都可以在一个字符串中用逗号连接,字符串作为“inputpath”参数传递。您将拥有一个包含所有文件数据的 RDD。可以应用过滤器“k.get == 0”以获得最终结果。

首先可以这样实现:

val lines = Source.fromFile("file1.txt").getLines.toSeq.view

val resultDF = lines.map(current =>
  sc.hadoopFile(current, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).filter(_._1 == 0)
).reduce(_ union _)

resultDF.take(3).foreach(println)

关于scala - 循环遍历文件行并通过 Spark 中的每次迭代执行函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47104704/

相关文章:

scala - 当实现只是抛出新错误时,Scala Array apply 方法如何返回索引处的值

Scala 3 list 替换

python - 将RDD保存为pyspark中的序列文件

sql-server - SparkSQL MS SQL Server,编译后获取消息 "No suitable driver"

scala - 案例分类和特征线性化

Scala 泛型 this.type

csv - 通过Ambari将大型csv文件加载到Hive

csv - 在 Python CSV 模块中将分隔符更改为 CTRL+A

hadoop - 在 Hadoop 上运行的 Oozie 作业出现问题 -/user/history/done_intermediate 上的权限

r - 安装 sparkR 时出错