使用 Scala 和 Spark,我有以下构造:
val rdd1: RDD[String] = ...
val rdd2: RDD[(String, Any)] = ...
val rdd1pairs = rdd1.map(s => (s, s))
val result = rdd2.join(rdd1pairs)
.map { case (_: String, (e: Any, _)) => e }
将rdd1
映射到PairRDD
的目的是为了在后续步骤中与rdd2
进行连接。然而,我实际上只对 rdd2 的值感兴趣,因此最后一行中的映射步骤省略了键。实际上,出于效率原因,这是使用 Spark 的 join()
执行的 rdd2
和 rdd1
之间的交集。
我的问题涉及 rdd1pairs 的键:它们仅在第一个映射步骤中出于语法原因(以允许连接)而创建,随后在没有任何使用的情况下被丢弃。编译器如何处理这个问题?是否使用 String s
(如示例所示)对内存消耗有影响吗?我应该用 null
或 0
替换它以节省一点内存吗?编译器是否实际创建并存储这些对象(引用),还是注意到它们从未被使用?
最佳答案
在这种情况下,我认为影响结果的是 Spark 驱动程序将执行的操作,而不是编译器。 Spark 是否可以优化其执行管道以避免创建冗余重复的 s
。我不确定,但我认为 Spark 会在内存中创建 rdd1pairs。
您可以使用(String, Unit)
,而不是映射到(String, String)
:
rdd1.map(s => (s,()))
您所做的基本上是基于rdd1
的rdd2
过滤器。如果rdd1明显小于rdd2,另一种方法是将rdd1的数据表示为广播变量而不是RDD,并简单地过滤rdd2。这避免了任何洗牌或归约阶段,因此可能会更快,但只有当 rdd1 的数据足够小以适合每个节点时才有效。
编辑:
考虑使用 Unit 而不是 String 如何节省空间,请考虑以下示例:
object size extends App {
(1 to 1000000).map(i => ("foo"+i, ()))
val input = readLine("prompt> ")
}
和
object size extends App {
(1 to 1000000).map(i => ("foo"+i, "foo"+i))
val input = readLine("prompt> ")
}
按照此问题 How to check heap usage of a running JVM from the command line? 中所述使用 jstat
命令第一个版本使用的堆明显少于后者。
编辑2:
Unit
实际上是一个没有内容的单例对象,因此从逻辑上讲,它不需要任何序列化。类型定义包含 Unit
的事实告诉您能够反序列化具有 Unit 类型字段的结构所需的一切。
Spark 默认使用 Java 序列化。考虑以下因素:
object Main extends App {
import java.io.{ObjectOutputStream, FileOutputStream}
case class Foo (a: String, b:String)
case class Bar (a: String, b:String, c: Unit)
val str = "abcdef"
val foo = Foo("abcdef", "xyz")
val bar = Bar("abcdef", "xyz", ())
val fos = new FileOutputStream( "foo.obj" )
val fo = new ObjectOutputStream( fos )
val bos = new FileOutputStream( "bar.obj" )
val bo = new ObjectOutputStream( bos )
fo writeObject foo
bo writeObject bar
}
两个文件大小相同:
�� sr Main$Foo3�,�z \ L at Ljava/lang/String;L bq ~ xpt abcdeft xyz
和
�� sr Main$Bar+a!N��b L at Ljava/lang/String;L bq ~ xpt abcdeft xyz
关于performance - Scala 编译器如何处理未使用的变量值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32392498/