我有以下代码,其中 rddMap
是 org.apache.spark.rdd.RDD[(String, (String, String))]
和 myHashMap
是 scala.collection.mutable.HashMap
。
我执行了 .saveAsTextFile("temp_out")
以强制评估 rddMap.map
。
然而,即使 println("t "+ t)
正在打印东西,稍后 myHashMap
仍然只有一个我手动放在开头的元素 ("test1", ("10", "20"))
.
rddMap
中的所有内容都不会放入 myHashMap
中。
片段代码:
val myHashMap = new HashMap[String, (String, String)]
myHashMap.put("test1", ("10", "20"))
rddMap.map { t =>
println(" t " + t)
myHashMap.put(t._1, t._2)
}.saveAsTextFile("temp_out")
println(rddMap.count)
println(myHashMap.toString)
为什么我不能将 rddMap 中的元素放到我的 myHashMap
中?
最佳答案
这是您想要完成的工作示例。
val rddMap = sc.parallelize(Map("A" -> ("v", "v"), "B" -> ("d","d")).toSeq)
// Collects all the data in the RDD and converts the data to a Map
val myMap = rddMap.collect().toMap
myMap.foreach(println)
输出:
(A,(v,v))
(B,(d,d))
这是与您发布的代码类似的代码
rddMap.map { t=>
println("t" + t)
newHashMap.put(t._1, t._2)
println(newHashMap.toString)
}.collect
这是 Spark shell 对上述代码的输出
t(A,(v,v))
Map(A -> (v,v), test1 -> (10,20))
t(B,(d,d))
Map(test1 -> (10,20), B -> (d,d))
在我看来,Spark 似乎复制了您的 HashMap 并将元素添加到copied 映射中。
关于scala - Spark : Cannot add RDD elements into a mutable HashMap inside a closure,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31016057/