scala - 从可变长度 CSV 到对 RDD 的 Spark 转换

标签 scala apache-spark rdd

我是 scala spark 的新手,我有一个如下所示的 CSV 文件。

R001, A, 10, C, 30, D, 50, X, 90
R002, E, 40, F, 70, G, 80, H, 90, J, 25 
R003, L, 30, M, 54, N, 67, O, 25, P, 85, Q, 100

如何将其转换为以行的第一个值作为键并跳过数值的 Pair RDD?

R001, A
R001, C
R001, D
R001, X
R002, E
R002, F
R002, G
R002, H
R002, J
R003, L
R003, M
R003, N
R003, O
R003, P
R003, Q

我试过了,它看起来不错,但我认为有很多更好的方法来做到这一点。

def isNumeric(str:String): Boolean = str.matches("[-+]?\\d+(\\.\\d+)?")

def process(field: Array[String]): Array[String] = { 
 val results = new Array[String](field.length)  
 for (i <- 1 to field.length-1) {
    if(!isNumeric(field(i).trim) && field(0)!=null && field(i)!=null)
     results(i)= field(0)+":"+field(i)
 }  
  results  
};

 def skipNulls(input : String) : String = {  
  if(input!=null && input.trim!="" && input.contains(":")) {
  var res = input.split(":")    
  res(0)+","+res(1)
  }  else {
    "null"
  }
};


val a= raw_csv.map(_.split(",")).flatMap(k => process(k))
val b= a.map(k => skipNulls(k))
val c = b.filter( x => x.contains("null")==false)
val d= c.toDF()
d.show
display(d)

最佳答案

看起来您对 Spark 部分的想法基本上是正确的(不过看起来您实际上并没有达到您想要的 RDD?我假设您知道这一点),问题是我们是否可以清理处理skipNulls。因为您正在使用 flatMap,所以我认为您应该能够过滤掉 process 中的坏点。

如果我没理解错的话,您是在引入空值(然后稍后删除它们),因为您有这个固定长度的数组。但是,我们可以将“键”和“值”分开,过滤到非数字值,然后组装我们想要的对,而不必创建这个数组。像这样的东西:

def process(line: String): List[(String, String)] = {
  val key :: values = line.split(",").toList
  values.filterNot(isNumeric).map(key -> _) // equivalent to .map(x => (key, x))
}

这结合了你的前几个步骤,所以我的版本将是 raw_csv.flatMap(process) 以到达 RDD[(String, String)]

将列表分开可以说是一个小魔术。以下内容也适用:

val elements = line.split(",")
val key = elements.head
val values = elements.tail

您可以将 isNumeric 重写为:

def isNumeric(s: String): Boolean = Try(s.toFloat).isSuccess

对于这比正则表达式好还是坏,我没有强烈的看法。 Tryscala.util 中。

关于scala - 从可变长度 CSV 到对 RDD 的 Spark 转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49040536/

相关文章:

apache-spark - Parquet 如何处理 SparseVector 列?

scala - Spark : Replicate each row but with change in one column value

scala - Apache Spark 溢出到磁盘

scala - 联合类型作为特征的类型参数的绑定(bind)(scala)

python - 如何在 PySpark 中分别对多个列进行透视

scala - 如何使用 Scala Stream 类读取大型 CSV 文件?

apache-spark - 如何从 DataFrame apache spark 中找到最大值 Alphabet?

scala - Spark RDD未从Elasticsearch获取所有源字段

scala - 如何从 Int 或 Byte 初始化枚举?

scala - 在 Scala 中交换不同类型的元组