java - 使用 Scala Apache Spark 合并 RDD

标签 java scala apache-spark

我有 2 个 RDD。

RDD1: ((String, String), Int)
RDD2: (String, Int)

例如:

    RDD1

    ((A, X), 1)
    ((B, X), 2)
    ((A, Y), 2)
    ((C, Y), 3)

    RDD2

    (A, 6)
    (B, 7)
    (C, 8)

Output Expected

    ((A, X), 6)
    ((B, X), 14)
    ((A, Y), 12)
    ((C, Y), 24)

在 RDD1 中,(String, String) 组合是唯一的,而在 RDD2 中,每个字符串键都是唯一的。 RDD2 (6) 中 A 的得分乘以 RDD1 中其键中包含 A 的条目的所有得分值。

14 = 7 * 2
12 = 6 * 2
24 = 8 * 3

我写了以下内容,但在大小写上给我一个错误:

val finalRdd = countRdd.join(countfileRdd).map(case (k, (ls, rs)) => (k, (ls * rs)))

有人可以帮我解决这个问题吗?

最佳答案

您的第一个 RDD 与第二个 RDD 的键类型不同(元组 (A, X) 与 A)。您应该在加入之前对其进行转换:

val rdd1  = sc.parallelize(List((("A", "X"), 1), (("A", "Y"), 2)))
val rdd2 = sc.parallelize(List(("A", 6)))
val rdd1Transformed = rdd1.map { 
   case ((letter, coord), value) => (letter, (coord, value)) 
}
val result = rdd1Transformed
  .join(rdd2)
  .map { 
    case (letter, ((coord, v1), v2)) => ((letter, coord), v1 * v2) 
  }
result.collect()
res1: Array[((String, String), Int)] = Array(((A,X),6), ((A,Y),12))

关于java - 使用 Scala Apache Spark 合并 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29880457/

相关文章:

scala - 调试 session

apache-spark - Hadoop 3和spark.sql:与HiveWarehouseSession和spark.sql一起使用

scala - 在 Spark RDD 中寻找最大值

scala - Scala中:::(三元冒号)的用途是什么

scala - 重复函数调用,直到我们在 Scala 中得到非空的 Option 结果

hadoop - Apache Spark - Hive 内部连接、LIMIT 和自定义 UDF

java - 如何将intellij中的普通java项目转为JavaFx项目

java - 将子类转换为父类(super class)后调用方法的有趣行为

java - 无法使用 TIKA 提取文本

java - 缓存和持久化数据