apache-spark - 星火广播变量

标签 apache-spark

我试过下面的代码

val t1 = sc.parallelize(0 until 10)
val t2 = sc.broadcast(2)
val t3 = t1.filter(_ % t2.value == 0).persist()
t3.count()
t2.destroy()
t3.count()

它在第二个 t3.count() 中提示“在它被销毁后尝试使用 Broadcast”,这让我很困惑。如果我理解正确,我们在 t3 上调用 persist,因此在第一个 t3.count()t3 之后存储在内存中。如果是这样,t3 不需要在第二个 t3.count() 中重新计算,销毁 t2 应该是安全的。但似乎事实并非如此。我想知道这里发生了什么。

最佳答案

问题:它提示在第二个 t3.count() 中“试图在它被销毁后使用广播”,这让我感到困惑。如果我理解正确,我们在 t3 上调用 persist,因此在第一个 t3.count() 之后,t3 存储在内存中。如果是这样,则不需要在第二个 t3.count() 中重新计算 t3,销毁 t2 应该是安全的。但事实似乎并非如此。


  • 使用带有 spark 2.4.0 的 spark-shell,我也遇到同样的错误。

*但令人惊讶的是,intellij local maven scala project (with Spark 2.4.5 and Spark 2.2.2 ) with use Case of cache/persist 我不明白 异常(exception)。 spark 中可能存在问题,也可能是其他原因。*

案例 1:不使用缓存/persist 调用 destroy

  val t1 = sc.parallelize(0 until 10)
  val t2 = sc.broadcast(2)
  val t3 = t1.filter(_ % t2.value == 0)
  println(t3.count())
  t2.destroy()
  println(t3.count())

因为它不是cachepersisted 你会得到下面的结果 结果:

org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at BroadCastCheck.scala:20) 
    at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)

案例 2:使用缓存/persist 调用销毁
使用 cache/persist 的用例:不会重新计算数据帧 t3。因此 destroy

后没有错误
 val t1 = sc.parallelize(0 until 10)
  val t2 = sc.broadcast(2)
  val t3 = t1.filter(_ % t2.value == 0).cache // or persist as well  
  println(t3.count())
  t2.destroy()
  println(t3.count())

结果:

5

5

关于apache-spark - 星火广播变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61191907/

相关文章:

python - 如何将 pyspark 中的一列映射到多列?

Scala - 使用 "endsWith"过滤数据帧

scala - Spark 结构化流处理以前的文件

dataframe - 从 pyspark 中的列表列表中提取列

scala - Spark/Scala 并行写入 redis

r - Sparklyr 的 spark_apply 函数似乎在单个执行程序上运行并且在中等大型数据集上失败

apache-spark - Spark SQL 替换 MySQL 的 GROUP_CONCAT 聚合函数

scala - spark-submit with scala package++ operator 返回 java.lang.NoSuchMethodError : scala. Predef$.refArrayOps

java - 启动 java Spark-streaming 应用程序时出现异常

apache-spark - Spark sql中如何使用outer apply