使用广播变量时,我面临一个奇怪的行为。每次我使用广播变量时,内容都会为每个节点复制一次,并且永远不会重用。
这是 spark-shell --master local[32] 中的示例:
(当然,这是无用且愚蠢的代码,但它确实显示了行为)
case class Test(a:String)
val test = Test("123")
val bc = sc.broadcast(test)
// On my 32 core machine, I get 33 copies of Test (expected)
// Yourkit profiler shows 33 instances of my object (32 are unreachable)
sc.parallelize((1 to 100)).map(x => bc.value.a).count
// Doing it again, Test copies are not reused and serialized again (now 65 copies, 64 are unreachable)
sc.parallelize((1 to 100)).map(x => bc.value.a).count
就我而言,我广播的变量有几百兆字节,由数百万个小对象(很少的哈希图和向量)组成。
每次我在使用它的 RDD 上运行操作时,都会浪费几 GB 的内存,垃圾收集器越来越成为瓶颈!
是设计为每次执行新的闭包重新广播变量还是它是一个错误,我应该重用我的副本?
为什么它们在使用后立即无法访问?
它特别适用于本地模式下的 Spark shell ?
注意:我使用的是 spark-1.3.1-hadoop2.6
更新 1:
根据这篇文章:http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-td11048.html
单例对象不再适用于 Spark 1.2.x+
所以这种解决方法也行不通:
val bcModel = sc.broadcast(bigModel)
object ModelCache {
@transient lazy private val localModel = { bcModel.value }
def getModel = localModel
}
sc.parallelize((1 to 100)).map(x => ModelCache.getModel.someValue)
更新 2:
我也尝试重用累加器模式但没有成功:
class VoidAccumulatorParams extends AccumulatorParam[BigModel] {
override def addInPlace(r1: BigModel, r2: BigModel): BigModel= { r1 }
override def zero(initialValue: BigModel): BigModel= { initialValue }
}
val acc = sc.accumulator(bigModel, "bigModel")(new VoidAccumulableParams())
sc.parallelize((1 to 100)).map(x => acc.localValue.someValue)
更新 3:
看起来单例对象在使用 spark-submit 而不是 scala shell 运行作业时工作。
最佳答案
看看广播测试 ( BroadcastSuite.scala ) 看看会发生什么。
当您运行第一个作业时,您的对象将被序列化、切割成块并将块发送到执行程序(通过 BlockManager 机制)。他们从块中反序列化对象并将其用于处理任务。当它们完成时,它们会丢弃反序列化的对象,但 BlockManager 会缓存序列化数据的块。
对于第二个作业,不需要序列化和传输对象。它只是从缓存中反序列化并使用。
警告:一方面,这无助于避免过度的 GC。另一件事是,我试图通过使用可变状态( class Test(var a: String) extends Serializable
)并在运行之间对其进行变异来验证上述理论。令我惊讶的是,第二次运行看到了突变状态!所以我要么完全错了,要么只是在 local
中错了。模式。我希望有人能告诉哪个。 (如果我明天记得的话,我会尝试自己进一步测试。)
关于apache-spark - Apache Spark 广播变量未重用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30244880/