scala - Scala Spark 中未调用 RDD 的 Map 函数

标签 scala apache-spark

当我调用 RDD 的映射函数时,未应用该函数。它对于 scala.collection.immutable.List 可以按预期工作,但对于 RDD 则不然。这是一些代码来说明:

val list = List ("a" , "d" , "c" , "d")
list.map(l => {
  println("mapping list")
})

val tm = sc.parallelize(list)
tm.map(m => {
  println("mapping RDD")
})

以上代码的结果是:

mapping list
mapping list
mapping list
mapping list

但是请注意“映射 RDD”不会打印到屏幕上。为什么会出现这种情况?

这是一个更大问题的一部分,我试图从 RDD 填充 HashMap:

  def getTestMap( dist: RDD[(String)]) = {

    var testMap = new java.util.HashMap[String , String]();

    dist.map(m => {
      println("populating map")
      testMap.put(m , m)

    })
    testMap
  }
val testM = getTestMap(tm)
println(testM.get("a"))

此代码打印 null

这是由于惰性评估造成的吗?

最佳答案

如果 map 是您正在执行的唯一操作,则延迟计算可能是其中的一部分。 Spark 不会安排执行,直到 action (在 Spark 术语中)是在 RDD 谱系上请求的。

当您执行一个操作时,println将会发生,但不是在您期望的驱动程序上发生,而是在执行该闭包的从属设备上发生。尝试查看 worker 的日志。

问题第二部分的 hashMap 群体上也发生了类似的情况。相同的代码将在每个分区、不同的工作线程上执行,并将被序列化回驱动程序。鉴于 Spark 已“清理”闭包,可能会从序列化闭包中删除 testMap,从而导致 null。请注意,如果只是由于 map 未执行,则 hashmap 应该为空,而不是 null。

如果你想将RDD的数据传输到另一个结构体,你需要在驱动程序中执行此操作。因此,您需要强制 Spark 将所有数据传递给驱动程序。这就是 rdd.collect() 的功能。

这应该适合您的情况。请注意,所有 RDD 数据都应适合驱动程序的内存:

import scala.collection.JavaConverters._
def getTestMap(dist: RDD[(String)]) =  dist.collect.map(m => (m , m)).toMap.asJava

关于scala - Scala Spark 中未调用 RDD 的 Map 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24388890/

相关文章:

java - 如何在 JavaPairRDD 上使用 max 方法

scala - 为什么Scala有SeqView而没有SetView?

http - 带有喷雾示例的分块响应处理

scala - 从 Spark 中具有不同架构的现有数据帧创建另一个数据帧

python - Pyspark 将 RowMatrix 转换为 DataFrame 或 RDD

scala - 在 IntelliJ 中找不到 reduceByKey 方法

scala - 如何更新现有 SparkSession 实例或在 spark-shell 中创建一个新实例?

scala - 喷json隐式UUID转换

database - 在 Play 框架中使用数据库

java - LaTeX 元音变音转义的正则表达式?