scala - 如何使用 reduceByKey 将值添加到 Scala Spark 中的 Set 中?

标签 scala mapreduce apache-spark

在我将我的 RDD 映射到

((_id_1, section_id_1), (_id_1, section_id_2), (_id_2, section_3), (_id_2, section_4))

我要reduceByKey
((_id_1, Set(section_id_1, section_id_2), (_id_2, Set(section_3, section_4)))
val collectionReduce = collection_filtered.map(item => {
      val extras = item._2.get("extras")
      var section_id = ""
      var extras_id = ""
      if (extras != null) {
        val extras_parse = extras.asInstanceOf[BSONObject]
        section_id = extras_parse.get("guid").toString
        extras_id = extras_parse.get("id").toString
      }
      (extras_id, Set {section_id})
    }).groupByKey().collect()

我的输出是
((_id_1, (Set(section_1), Set(section_2))), (_id_2, (Set(section_3), Set(section_4))))

我该如何解决?

最佳答案

您可以使用 reduceByKey只需使用 ++合并列表。

val rdd = sc.parallelize((1, Set("A")) :: (2, Set("B")) :: (2, Set("C")) :: Nil)
val reducedRdd = rdd.reduceByKey(_ ++ _)
reducedRdd.collect()
// Array((1,Set(A)), (2,Set(B, C)))

在你的情况下:
collection_filtered.map(item => {
  // ...
  (extras_id, Set(section_id))
}).reduceByKey(_ ++ _).collect()

关于scala - 如何使用 reduceByKey 将值添加到 Scala Spark 中的 Set 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31557260/

相关文章:

scala - 在 Scala 中最多只能做 4 个并发 future

scala - 使用 foldRight 反转列表的优雅方法?

scala - 使用 Slick 处理大表失败并出现 OutOfMemoryError

hadoop - hadoop 中的二进制类型是什么?

scala - 如何使用 Spark DataFrames 和 Cassandra 设置命名策略

hadoop - 这些是操作 yarn 上 Spark 的正确组件吗?

design-patterns - 具有相同参数化类型的 Scala mixin

javascript - 在 Riak 中使用二级索引作为 map reduce 函数的输入

java - 如何指定tab作为hadoop输入文本文件的记录分隔符?

scala - 在 spark 中遍历每一列并找到最大长度