我有 2 个 RDD。
RDD1: ((String, String), Int)
RDD2: (String, Int)
例如:
RDD1
((A, X), 1)
((B, X), 2)
((A, Y), 2)
((C, Y), 3)
RDD2
(A, 6)
(B, 7)
(C, 8)
Output Expected
((A, X), 6)
((B, X), 14)
((A, Y), 12)
((C, Y), 24)
在 RDD1 中,(String, String) 组合是唯一的,而在 RDD2 中,每个字符串键都是唯一的。 RDD2 (6) 中 A 的得分乘以 RDD1 中其键中包含 A 的条目的所有得分值。
14 = 7 * 2
12 = 6 * 2
24 = 8 * 3
我写了以下内容,但在大小写上给我一个错误:
val finalRdd = countRdd.join(countfileRdd).map(case (k, (ls, rs)) => (k, (ls * rs)))
有人可以帮我解决这个问题吗?
最佳答案
您的第一个 RDD 与第二个 RDD 的键类型不同(元组 (A, X) 与 A)。您应该在加入之前对其进行转换:
val rdd1 = sc.parallelize(List((("A", "X"), 1), (("A", "Y"), 2)))
val rdd2 = sc.parallelize(List(("A", 6)))
val rdd1Transformed = rdd1.map {
case ((letter, coord), value) => (letter, (coord, value))
}
val result = rdd1Transformed
.join(rdd2)
.map {
case (letter, ((coord, v1), v2)) => ((letter, coord), v1 * v2)
}
result.collect()
res1: Array[((String, String), Int)] = Array(((A,X),6), ((A,Y),12))
关于java - 使用 Scala Apache Spark 合并 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29880457/