scala - Apache Spark : Why I can't use broadcast var defined in a global object

标签 scala apache-spark

这是一个简化的例子来表达我的担忧。此示例包含 3 个文件和 3 个对象,具体取决于 Spark 1.6.1。

//file globalObject.scala 
import org.apache.spark.broadcast.Broadcast 

object globalObject { 
  var br_value: Broadcast[Map[Int, Double]] = null 
} 


//file someFunc.scala 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 

object someFunc { 
  def go(rdd: RDD[Int])(implicit sc: SparkContext): Array[Int] = { 
    rdd.map(i => { 
      val acc = globalObject.br_value.value 
      if(acc.contains(i)) { 
        i + 1 
      } else { 
        i 
      } 
    }).take(100) 
 } 
} 

//testMain.scala 
import org.apache.spark.{SparkConf, SparkContext} 

object testMain { 
  def bootStrap()(implicit sc:SparkContext): Unit = { 
    globalObject.br_value = sc.broadcast(Map(1->2, 2->3, 4->5)) 
  } 

  def main(args: Array[String]): Unit = { 
    lazy val appName = getClass.getSimpleName.split("\\$").last 
    implicit val sc = new SparkContext(new SparkConf().setAppName(appName)) 
    val datardd = sc.parallelize(Range(0, 200), 200) 
      .flatMap(i => Range(0, 1000)) 

    bootStrap() 
    someFunc.go(datardd).foreach(println) 

  } 
} 

当我在集群上运行此代码时,出现以下错误:

ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.lang.NullPointerException 
        at someFunc$$anonfun$go$1.apply$mcII$sp(someFunc.scala:7) 
        at someFunc$$anonfun$go$1.apply(someFunc.scala:6) 
        at someFunc$$anonfun$go$1.apply(someFunc.scala:6) 
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
        at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 

显然,数据没有成功广播。这几天重构代码的时候遇到了这个问题。我希望不同的 scala 对象共享相同的广播变量。但现在它就在这里。现在很困惑,据我了解驱动程序使用指针来指示广播变量。调用广播变量不应限制在同一代码范围内。

如果我错了,请纠正我。在 scala 对象之间共享广播 var 的正确方法是什么?提前致谢。

最佳答案

map中的代码被序列化并在每个节点上执行。 val acc = globalObject.br_value.value 使用节点的 globalObject.br_value。但当然这仍然是null;您只需将其分配给驱动程序。您可以通过将广播变量从 lambda 中拉出来使代码关闭:

val br_value = globalObject.br_value
rdd.map(i => { 
  val acc = br_value.value 
  if(acc.contains(i)) { 
    i + 1 
  } else { 
    i 
  } 
}).take(100) 

关于scala - Apache Spark : Why I can't use broadcast var defined in a global object,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38928137/

相关文章:

具有默认方法与特征的 C# 接口(interface)

scala - 如何调整 Spark 应用程序以避免 OOM 异常

scala - 用Scala,Apache Spark编写的编译错误保存模型

scala - 使用链接提升传递数据

java - 将数据集中的映射类型映射到列

apache-spark - 类型错误 : Column is not iterable - How to iterate over ArrayType()?

python - PySpark 合并数据帧和计数值

scala - 使用 Scala 中的简单表达式链接日志记录

scala - 如何询问 Scala 是否存在所有类型参数实例化的证据?

java - Scala:函数有接口(interface)的返回类型,我需要实现类