apache-spark - Apache Spark 广播变量未重用

标签 apache-spark

使用广播变量时​​,我面临一个奇怪的行为。每次我使用广播变量时​​,内容都会为每个节点复制一次,并且永远不会重用。

这是 spark-shell --master local[32] 中的示例:
(当然,这是无用且愚蠢的代码,但它确实显示了行为)

case class Test(a:String)
val test = Test("123")
val bc = sc.broadcast(test)

// On my 32 core machine, I get 33 copies of Test (expected)
// Yourkit profiler shows 33 instances of my object (32 are unreachable)
sc.parallelize((1 to 100)).map(x => bc.value.a).count

// Doing it again, Test copies are not reused and serialized again (now 65 copies, 64 are unreachable)
sc.parallelize((1 to 100)).map(x => bc.value.a).count

就我而言,我广播的变量有几百兆字节,由数百万个小对象(很少的哈希图和向量)组成。

每次我在使用它的 RDD 上运行操作时,都会浪费几 GB 的内存,垃圾收集器越来越成为瓶颈!

是设计为每次执行新的闭包重新广播变量还是它是一个错误,我应该重用我的副本?

为什么它们在使用后立即无法访问?

它特别适用于本地模式下的 Spark shell ?

注意:我使用的是 spark-1.3.1-hadoop2.6

更新 1:
根据这篇文章:http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-td11048.html
单例对象不再适用于 Spark 1.2.x+
所以这种解决方法也行不通:
val bcModel = sc.broadcast(bigModel)
object ModelCache {
  @transient lazy private val localModel = { bcModel.value }
  def getModel = localModel
}
sc.parallelize((1 to 100)).map(x => ModelCache.getModel.someValue)

更新 2:
我也尝试重用累加器模式但没有成功:
class VoidAccumulatorParams extends AccumulatorParam[BigModel] {
    override def addInPlace(r1: BigModel, r2: BigModel): BigModel= { r1 }
    override def zero(initialValue: BigModel): BigModel= { initialValue }
}
val acc = sc.accumulator(bigModel, "bigModel")(new VoidAccumulableParams())
sc.parallelize((1 to 100)).map(x => acc.localValue.someValue)

更新 3:
看起来单例对象在使用 spark-submit 而不是 scala shell 运行作业时工作。

最佳答案

看看广播测试 ( BroadcastSuite.scala ) 看看会发生什么。

当您运行第一个作业时,您的对象将被序列化、切割成块并将块发送到执行程序(通过 BlockManager 机制)。他们从块中反序列化对象并将其用于处理任务。当它们完成时,它们会丢弃反序列化的对象,但 BlockManager 会缓存序列化数据的块。

对于第二个作业,不需要序列化和传输对象。它只是从缓存中反序列化并使用。

警告:一方面,这无助于避免过度的 GC。另一件事是,我试图通过使用可变状态( class Test(var a: String) extends Serializable )并在运行之间对其进行变异来验证上述理论。令我惊讶的是,第二次运行看到了突变状态!所以我要么完全错了,要么只是在 local 中错了。模式。我希望有人能告诉哪个。 (如果我明天记得的话,我会尝试自己进一步测试。)

关于apache-spark - Apache Spark 广播变量未重用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30244880/

相关文章:

apache-spark - Spark Cassandra 使用区分大小写的名称写入 UDT 失败

hadoop - Spark:增加任务/分区的数量

scala - 将列的元素除以按另一列元素分组的(同一列的)元素总和

apache-spark - 如何在 spark-shell/pyspark 中打印出 RDD 的片段?

scala - Apache Spark : RDD[Char] but should be RDD[String] as result of flatmap

apache-spark - 如何在独立集群模式下为每个工作人员分配更多的执行程序?

postgresql - 在 Bluemix Apache-Spark 服务上运行的 Spark 应用程序中连接到 postgresql 数据库

Hadoop - 当有 Spark 作业正在运行时,Sqoop 作业卡在已接受状态

java - 根据 DataStax Enterprise 的运行时类路径构建 Spark 应用程序

mysql - 将大数据从 MySQL 加载到 Spark 中