java - Spark 蓄能器

标签 java scala apache-spark

我是 Spark 中累加器的新手。我创建了一个累加器,它将数据框中所有列的总和和计数信息收集到一个 Map 中。 哪个没有按预期运行,所以我有一些疑问。

当我在本地模式下运行此类(粘贴在下面)时,我可以看到累加器得到更新但最终值仍然为空。出于调试目的,我在 add() 中添加了一个打印语句。

Q1) 为什么在添加累加器时最终的累加器没有更新?

作为引用,我研究了 CollectionsAccumulator,他们在其中使用了 Java Collections 中的 SynchronizedList。

Q2) 更新累加器是否需要同步/并发集合?

Q3) 哪个集合最适合这样的目的?

我附上了我的执行流程以及用于分析的 spark ui 快照。

谢谢。

执行:

输入数据帧 -

+-------+-------+
|Column1|Column2|
+-------+-------+
|1      |2      |
|3      |4      |
+-------+-------+

输出 -

添加 - map (列 1 -> map (总和 -> 1,计数 -> 1),列 2 -> map (总和 -> 2,计数 -> 1))

添加 - map (列 1 -> map (总和 -> 4,计数 -> 2),列 2 -> map (总和 -> 6,计数 -> 2))

TestRowAccumulator(id: 1, name: Some(Test Accumulator for Sum&Count), value: Map())

SPARK UI 快照 -

SPARK UI -

类:

class TestRowAccumulator extends AccumulatorV2[Row,Map[String,Map[String,Int]]]{

  private var colMetrics: Map[String, Map[String, Int]] = Map[String , Map[String , Int]]()


  override def isZero: Boolean = this.colMetrics.isEmpty

  override def copy(): AccumulatorV2[Row, Map[String,Map[String,Int]]] = {
    val racc = new TestRowAccumulator
    racc.colMetrics = colMetrics
    racc
  }

  override def reset(): Unit = {
    colMetrics = Map[String,Map[String,Int]]()
  }

  override def add(v: Row): Unit = {

    v.schema.foreach(field => {
      val name: String = field.name
      val value: Int = v.getAs[Int](name)
      if(!colMetrics.contains(name))
        {
          colMetrics = colMetrics ++ Map(name -> Map("sum" -> value , "count" -> 1 ))
        }else
        {
          val metric = colMetrics(name)
          val sum = metric("sum") + value
          val count = metric("count") + 1

          colMetrics = colMetrics ++ Map(name -> Map("sum" -> sum , "count" -> count))
        }
    })
  }

  override def merge(other: AccumulatorV2[Row, Map[String,Map[String,Int]]]): Unit = {
    other match {
      case t:TestRowAccumulator => {
        colMetrics.map(col => {
          val map2: Map[String, Int] = t.colMetrics.getOrElse(col._1 , Map())
          val map1: Map[String, Int] = col._2
          map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k,0)) }
        } )
      }
      case _ => throw new UnsupportedOperationException(s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
    }
  }

  override def value: Map[String, Map[String, Int]] = {
    colMetrics
  }
}

最佳答案

经过一些调试,我发现正在调用合并函数。 它有错误的代码,所以可累积的值为 Map()

累加器的执行流程(本地模式): 添加 添加 合并

一旦我更正了合并功能,累加器就会按预期工作

关于java - Spark 蓄能器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54652786/

相关文章:

java - 在 RxJava/RxAndroid 中为 Activity 生命周期处理订阅的正确方法是什么?

java - 关于有符号字节的二进制转换为十六进制的问题

scala - 为什么有些集合类除了类方法之外还有一个 "empty"实例方法?

scala - 使用 pregel graphx 激发一对一最短路径

java - 如何在 Tomcat 8 中使用 Bean Validation 2.0 (JSR 380) 功能?

java - 从 Web 访问文件系统的 Java Applet 替代方案

Scala: View 的应用

scala - 测试 sbt 没有正确读取 java.library.path

apache-spark - FileNotFoundException - delta_log 中缺少 checkpoint.parquet

api - 如何在 Spark 中检查 HDFS 目录是否为空