当我调用 RDD 的映射函数时,未应用该函数。它对于 scala.collection.immutable.List 可以按预期工作,但对于 RDD 则不然。这是一些代码来说明:
val list = List ("a" , "d" , "c" , "d")
list.map(l => {
println("mapping list")
})
val tm = sc.parallelize(list)
tm.map(m => {
println("mapping RDD")
})
以上代码的结果是:
mapping list
mapping list
mapping list
mapping list
但是请注意“映射 RDD”不会打印到屏幕上。为什么会出现这种情况?
这是一个更大问题的一部分,我试图从 RDD 填充 HashMap:
def getTestMap( dist: RDD[(String)]) = {
var testMap = new java.util.HashMap[String , String]();
dist.map(m => {
println("populating map")
testMap.put(m , m)
})
testMap
}
val testM = getTestMap(tm)
println(testM.get("a"))
此代码打印 null
这是由于惰性评估造成的吗?
最佳答案
如果 map
是您正在执行的唯一操作,则延迟计算可能是其中的一部分。 Spark 不会安排执行,直到 action (在 Spark 术语中)是在 RDD 谱系上请求的。
当您执行一个操作时,println
将会发生,但不是在您期望的驱动程序上发生,而是在执行该闭包的从属设备上发生。尝试查看 worker 的日志。
问题第二部分的 hashMap
群体上也发生了类似的情况。相同的代码将在每个分区、不同的工作线程上执行,并将被序列化回驱动程序。鉴于 Spark 已“清理”闭包,可能会从序列化闭包中删除 testMap
,从而导致 null
。请注意,如果只是由于 map
未执行,则 hashmap 应该为空,而不是 null。
如果你想将RDD的数据传输到另一个结构体,你需要在驱动程序中执行此操作。因此,您需要强制 Spark 将所有数据传递给驱动程序。这就是 rdd.collect() 的功能。
这应该适合您的情况。请注意,所有 RDD 数据都应适合驱动程序的内存:
import scala.collection.JavaConverters._
def getTestMap(dist: RDD[(String)]) = dist.collect.map(m => (m , m)).toMap.asJava
关于scala - Scala Spark 中未调用 RDD 的 Map 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51743094/