这是一个简化的例子来表达我的担忧。此示例包含 3 个文件和 3 个对象,具体取决于 Spark 1.6.1。
//file globalObject.scala
import org.apache.spark.broadcast.Broadcast
object globalObject {
var br_value: Broadcast[Map[Int, Double]] = null
}
//file someFunc.scala
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object someFunc {
def go(rdd: RDD[Int])(implicit sc: SparkContext): Array[Int] = {
rdd.map(i => {
val acc = globalObject.br_value.value
if(acc.contains(i)) {
i + 1
} else {
i
}
}).take(100)
}
}
//testMain.scala
import org.apache.spark.{SparkConf, SparkContext}
object testMain {
def bootStrap()(implicit sc:SparkContext): Unit = {
globalObject.br_value = sc.broadcast(Map(1->2, 2->3, 4->5))
}
def main(args: Array[String]): Unit = {
lazy val appName = getClass.getSimpleName.split("\\$").last
implicit val sc = new SparkContext(new SparkConf().setAppName(appName))
val datardd = sc.parallelize(Range(0, 200), 200)
.flatMap(i => Range(0, 1000))
bootStrap()
someFunc.go(datardd).foreach(println)
}
}
当我在集群上运行此代码时,出现以下错误:
ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at someFunc$$anonfun$go$1.apply$mcII$sp(someFunc.scala:7)
at someFunc$$anonfun$go$1.apply(someFunc.scala:6)
at someFunc$$anonfun$go$1.apply(someFunc.scala:6)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
显然,数据没有成功广播。这几天重构代码的时候遇到了这个问题。我希望不同的 scala 对象共享相同的广播变量。但现在它就在这里。现在很困惑,据我了解驱动程序使用指针来指示广播变量。调用广播变量不应限制在同一代码范围内。
如果我错了,请纠正我。在 scala 对象之间共享广播 var 的正确方法是什么?提前致谢。
最佳答案
map
中的代码被序列化并在每个节点上执行。 val acc = globalObject.br_value.value
使用节点的 globalObject.br_value
。但当然这仍然是null
;您只需将其分配给驱动程序。您可以通过将广播变量从 lambda 中拉出来使代码关闭:
val br_value = globalObject.br_value
rdd.map(i => {
val acc = br_value.value
if(acc.contains(i)) {
i + 1
} else {
i
}
}).take(100)
关于scala - Apache Spark : Why I can't use broadcast var defined in a global object,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38928137/