java - 如何更新worker节点上的SPARK广播值?

标签 java apache-spark broadcast

我有一个从数据库获得的广播值。我在driver master上定义了广播

val stampsBroadcast = ssc.sparkContext.broadcast(stampListMap)

此值 (stampsBroadcast.value) 用于工作节点执行器。一旦执行者完成任务(将新 key 添加到数据库)。我需要更新广播值以添加这个新 key 。

我尝试使用:

stampsBroadcast.unpersist(false)
ssc.sparkContext.broadcast(NewstampsBroadcastValue)

但似乎我无法在工作节点上使用ssc。如果我在driver master上重新广播,如何从worker节点获取新数据?

最佳答案

您无法从工作节点创建广播变量。

就您而言,基本上您需要 Accumulators 。在驱动程序上定义累加器。在工作节点上,您可以更新累加器值。您可以再次获取驱动程序上的更新值。

注意:您无法检索工作节点上累加器的值。工作节点只能更新值。

下面是 Spark 文档的示例:

// creating the accumulator on driver
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

// updating the accumulator on worker nodes
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

// fetaching the value
scala> accum.value
res2: Long = 10

关于java - 如何更新worker节点上的SPARK广播值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48393506/

相关文章:

hadoop - 不同的Spark输入和输出格式

javascript - chrome.socket 如何用于广播或多播?

android - 如何将 Activity 带到前台(如果不存在则创建)?

scala - 使用一对 (K,Collection[V]) 时 Spark : RDD. saveAsTextFile

apache-spark - Spark查找日期分区列的最大值

java - Liferay 导航 Hook

java - 单击按钮时在 Java 中设置标签

c# - 查找或构建进程间广播通信 channel

java - 如何在一个pom文件中生成两个xmlbeans

java - 你如何将本地库链接到 IntelliJ 中的 jar?