scala - Scala Spark 中未调用 RDD 的 Map 函数

标签 scala apache-spark

当我调用 RDD 的映射函数时,未应用该函数。它对于 scala.collection.immutable.List 可以按预期工作,但对于 RDD 则不然。这是一些代码来说明:

val list = List ("a" , "d" , "c" , "d")
list.map(l => {
  println("mapping list")
})

val tm = sc.parallelize(list)
tm.map(m => {
  println("mapping RDD")
})

以上代码的结果是:

mapping list
mapping list
mapping list
mapping list

但是请注意“映射 RDD”不会打印到屏幕上。为什么会出现这种情况?

这是一个更大问题的一部分,我试图从 RDD 填充 HashMap:

  def getTestMap( dist: RDD[(String)]) = {

    var testMap = new java.util.HashMap[String , String]();

    dist.map(m => {
      println("populating map")
      testMap.put(m , m)

    })
    testMap
  }
val testM = getTestMap(tm)
println(testM.get("a"))

此代码打印 null

这是由于惰性评估造成的吗?

最佳答案

如果 map 是您正在执行的唯一操作,则延迟计算可能是其中的一部分。 Spark 不会安排执行,直到 action (在 Spark 术语中)是在 RDD 谱系上请求的。

当您执行一个操作时,println将会发生,但不是在您期望的驱动程序上发生,而是在执行该闭包的从属设备上发生。尝试查看 worker 的日志。

问题第二部分的 hashMap 群体上也发生了类似的情况。相同的代码将在每个分区、不同的工作线程上执行,并将被序列化回驱动程序。鉴于 Spark 已“清理”闭包,可能会从序列化闭包中删除 testMap,从而导致 null。请注意,如果只是由于 map 未执行,则 hashmap 应该为空,而不是 null。

如果你想将RDD的数据传输到另一个结构体,你需要在驱动程序中执行此操作。因此,您需要强制 Spark 将所有数据传递给驱动程序。这就是 rdd.collect() 的功能。

这应该适合您的情况。请注意,所有 RDD 数据都应适合驱动程序的内存:

import scala.collection.JavaConverters._
def getTestMap(dist: RDD[(String)]) =  dist.collect.map(m => (m , m)).toMap.asJava

关于scala - Scala Spark 中未调用 RDD 的 Map 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51743094/

相关文章:

reflection - 如何通过反射访问字段的值(Scala 2.8)

xml - 如何从大型 XML 文档中获取流式迭代器 [Node]?

scala - 对象spark不是包org的成员

apache-spark - Spark 窗口函数 - rangeBetween 日期

python - Spark - 将具有不同架构(列名和序列)的 DataFrame 合并/联合到具有主通用架构的 DataFrame

apache-spark - 如何计算一天从 Kafka 主题中获取的消息数?

c# - 厌倦了非语义测试来弥补动态类型-建议吗?

scala - 函数式根据 Scala 中的值创建列表

apache-spark - Spark for Python - 无法将字符串列转换为十进制/ double

scala - 语法function[T]在scala中是什么意思