我是 scala spark 的新手,我有一个如下所示的 CSV 文件。
R001, A, 10, C, 30, D, 50, X, 90
R002, E, 40, F, 70, G, 80, H, 90, J, 25
R003, L, 30, M, 54, N, 67, O, 25, P, 85, Q, 100
如何将其转换为以行的第一个值作为键并跳过数值的 Pair RDD?
R001, A
R001, C
R001, D
R001, X
R002, E
R002, F
R002, G
R002, H
R002, J
R003, L
R003, M
R003, N
R003, O
R003, P
R003, Q
我试过了,它看起来不错,但我认为有很多更好的方法来做到这一点。
def isNumeric(str:String): Boolean = str.matches("[-+]?\\d+(\\.\\d+)?")
def process(field: Array[String]): Array[String] = {
val results = new Array[String](field.length)
for (i <- 1 to field.length-1) {
if(!isNumeric(field(i).trim) && field(0)!=null && field(i)!=null)
results(i)= field(0)+":"+field(i)
}
results
};
def skipNulls(input : String) : String = {
if(input!=null && input.trim!="" && input.contains(":")) {
var res = input.split(":")
res(0)+","+res(1)
} else {
"null"
}
};
val a= raw_csv.map(_.split(",")).flatMap(k => process(k))
val b= a.map(k => skipNulls(k))
val c = b.filter( x => x.contains("null")==false)
val d= c.toDF()
d.show
display(d)
最佳答案
看起来您对 Spark 部分的想法基本上是正确的(不过看起来您实际上并没有达到您想要的 RDD?我假设您知道这一点),问题是我们是否可以清理处理
和skipNulls
。因为您正在使用 flatMap
,所以我认为您应该能够过滤掉 process
中的坏点。
如果我没理解错的话,您是在引入空值(然后稍后删除它们),因为您有这个固定长度的数组。但是,我们可以将“键”和“值”分开,过滤到非数字值,然后组装我们想要的对,而不必创建这个数组。像这样的东西:
def process(line: String): List[(String, String)] = {
val key :: values = line.split(",").toList
values.filterNot(isNumeric).map(key -> _) // equivalent to .map(x => (key, x))
}
这结合了你的前几个步骤,所以我的版本将是 raw_csv.flatMap(process)
以到达 RDD[(String, String)]
。
将列表分开可以说是一个小魔术。以下内容也适用:
val elements = line.split(",")
val key = elements.head
val values = elements.tail
您可以将 isNumeric
重写为:
def isNumeric(s: String): Boolean = Try(s.toFloat).isSuccess
对于这比正则表达式好还是坏,我没有强烈的看法。 Try
在 scala.util
中。
关于scala - 从可变长度 CSV 到对 RDD 的 Spark 转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49040536/