apache-spark - Spark 蓄能器与计数

标签 apache-spark

我有一个用例,我想计算 RDD 中与某些过滤器匹配的元素类型。

例如RDD.filter(F1) 和 RDD.filter(!F1)

我有两个选择

  1. 使用累加器:例如
LongAccumulator l1 = sparkContext.longAccumulator("Count1")
LongAccumulator l2 = sparkContext.longAccumulator("Count2")
RDD.forEachPartition(f -> {
    if(F1) l1.add(1)
    else l2.add(1)
});
  1. 使用次数

RDD.filter(F1).count(); RDD.filter(!F1).count()

第一种方法的一个好处是我们只需要迭代一次数据(很有用,因为我的数据集是 10 TB)

如果使用累加器可以达到相同的效果,那么计数有什么用?

最佳答案

主要区别在于,如果您的代码在转换中失败,则累加器将为 updated而 count() 结果不是。

其他选择是使用纯 map-reduce:

val counts = rdd.map(x => (F1(x), 1)).reduceByKey(_ + _).collectAsMap()

网络成本也应该很低,因为只会发送很少的号码。它创建成对的 (is F1(x) true/false, 1) 然后对所有的求和 - 它会给你 F1(x) 和 !F1(x) 在计数映射中的项目数

关于apache-spark - Spark 蓄能器与计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41261074/

相关文章:

apache-spark - ALS模型增量训练

apache-spark - Spark应用程序杀死执行程序

bash - Hadoop Yarn 上的 Spark 安装

apache-spark - 在EMR从站上运行命令?

java - 在 Spark Streaming 中使用 count() 作为整数

apache-spark - Spark Streaming 应用程序的核心用法

java - MapReduce到Spark

scala - Apache Spark 中的大型 RDD [MatrixEntry] 超出了 GC 开销限制

scala - 在 spark 中通过验证转换 Dataframe 列

apache-spark - Spark 流 : select record with max timestamp for each id in dataframe (pyspark)