scala - 将数组(行)的 RDD 转换为行的 RDD?

标签 scala apache-spark dataframe rdd

我的文件中有这样的数据,我想使用 Spark 进行一些统计。

文件内容:

aaa|bbb|ccc
ddd|eee|fff|ggg

我需要为每一行分配一个 id。我将它们读取为 rdd 并使用 zipWithIndex()

那么它们应该是这样的:

(0, aaa|bbb|ccc)
(1, ddd|eee|fff|ggg)

我需要将每个字符串与 id 关联起来。我可以获取Array(Row)的RDD,但无法跳出数组。

我应该如何修改我的代码?

import org.apache.spark.sql.{Row, SparkSession}

val fileRDD = spark.sparkContext.textFile(filePath)
val fileWithIdRDD = fileRDD.zipWithIndex()
// make the line like this: (0, aaa), (0, bbb), (0, ccc)
// each line is a record of Array(Row)
fileWithIdRDD.map(x => {
  val id = x._1
  val str = x._2
  val strArr = str.split("\\|")
  val rowArr = strArr.map(y => {
    Row(id, y)
  }) 
  rowArr 
})

现在看起来像:

[(0, aaa), (0, bbb), (0, ccc)]
[(1, ddd), (1, eee), (1, fff), (1, ggg)]

但最后我想要:

(0, aaa)
(0, bbb) 
(0, ccc)
(1, ddd)
(1, eee)
(1, fff)
(1, ggg)

最佳答案

你只需要展平你的RDD

yourRDD.flatMap(array => array)

考虑您的代码(修复了内部映射内部以及 id 和 str 的分配中的一些错误)

fileWithIdRDD.map(x => {
  val id = x._1
  val str = x._2
  val strArr = str.split("\\|")
  val rowArr = strArr.map(y => {
    Row(id, y)
  }) 
  rowArr 
}).flatMap(array => array)

这里是简单的例子:

输入

fileWithIdRDD.collect
res30: Array[(Int, String)] = Array((0,aaa|bbb|ccc), (1,ddd|eee|fff|ggg))

执行

scala> fileWithIdRDD.map(x => {
      val id = x._1
      val str = x._2
      val strArr = str.split("\\|")
        val rowArr = strArr.map(y => {
          Row(id, y)
        })
      rowArr
      }).flatMap(array => array)


res31: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at flatMap at <console>:35

输出

scala> res31.collect
res32: Array[org.apache.spark.sql.Row] = Array([0,aaa], [0,bbb], [0,ccc], [1,ddd], [1,eee], [1,fff], [1,ggg])

关于scala - 将数组(行)的 RDD 转换为行的 RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55702513/

相关文章:

r - 确定 R 中两列是否包含负值和正值

string - 字符串在ScalaTest Matchers中包含许多子字符串

scala - 如何在 Spark-Submit 应用程序中执行 S3-dist-cp 命令

scala - 扩展教程:com.twitter.scalding.InvalidSourceException:一条或多条路径中的数据丢失

java - 用于激发 StructType 的 Avro Schema

r - 从数据帧中的重复测量中检测不可能的数据输入错误

java - 在 Java 中使用不可变类的 Scala 扩展

java - Spark : Creating Object RDD from List<Object> RDD

java - 从另一个 Java 应用程序部署 Apache Spark 应用程序,最佳实践

r - 将函数应用于 R data.frame 中另一列的一系列值以使其保持矢量化的最佳方法是什么?