apache-spark - 如何拦截驱动程序上累加器的部分更新?

标签 apache-spark java-8 accumulator

Spark 1.5.1 + Java 1.8

我们正在使用 spark 将大量记录上传到数据库。

操作代码如下所示:

rdd.foreachPartition(new VoidFunction<Iterator<T>>() {

     @Override
     public void call(Iterator<T> iter) {
          //while there are more records perform the following every 1000 records
          //int[] recoords = statement.executeBatch();
          //accumulator.add(recoords.length);
     }
     // ...
} 

在驱动程序节点上有一个线程监视累加器值。但是,该值不会更新。它只会在应用程序结束时更新一次。即使累加器使用了惰性值设置,它也应该正确更新,因为我在驱动程序节点线程中定期读取该值。

我是否错误地使用了蓄能器?无论如何,我是否可以更持续地监控员工的进度?

最佳答案

您可以监视累加器值,但不能连续执行,即在任务完成后进行更新。

尽管累加器被称为共享变量,但实际上并不是共享的。每个任务都有自己的累加器,在任务完成后合并。这意味着在任务运行时无法更新全局值。

为了能够看到更新,执行程序的数量必须少于已处理分区的数量(对应于任务的数量)。这样做的原因是在将累加器更新发送到驱动程序时引入“障碍”。

例如:

import org.apache.spark.{SparkConf, SparkContext}

object App {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[4]")
    val sc = new SparkContext(conf)

    val accum = sc.accumulator(0, "An Accumulator")
    val rdd = sc.parallelize(1 to 1000, 20)

    import scala.concurrent.duration._
    import scala.language.postfixOps
    import rx.lang.scala._

    val o = Observable.interval(1000 millis).take(1000)
    val s = o.subscribe(_ => println(accum.value))
    rdd.foreach(x => {
      Thread.sleep(x + 200)
      accum += 1
    })
    s.unsubscribe
    sc.stop
  }
}

如您所见,每个任务的全局值仅更新一次。

如果您按照提供的示例创建命名累加器,您也可以使用 Spark UI 监控它的状态。只需打开阶段选项卡,导航到特定阶段并检查累加器部分。

Is there anyway I can more continuously monitor progress from my workers?



最可靠的方法是通过添加更多分区来增加粒度,但这并不便宜。

关于apache-spark - 如何拦截驱动程序上累加器的部分更新?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35012301/

相关文章:

apache-spark - Spark DataFrame 缓存大型临时表

list - 有没有办法用java 8流更改日期格式?

循环运行时出现 Java 逻辑错误 : If/else counter incorrectly counting

具有删除旧样本能力的 C++ 累加器库

apache-spark - 如何将spark sql数据框的摘要写入excel文件

csv - Spark-如何在没有文件夹的情况下编写单个csv文件?

apache-spark - 如何解释 Spark Stage UI 中的输入大小/记录

java - 如何获取List中多个最大值的索引

python - 使用内部类输入类型调用 AWS lambda 函数

c++ - 是否可以将 boost 累加器与 vector 一起使用?