我有一个用例,我想计算 RDD 中与某些过滤器匹配的元素类型。
例如RDD.filter(F1) 和 RDD.filter(!F1)
我有两个选择
- 使用累加器:例如
LongAccumulator l1 = sparkContext.longAccumulator("Count1") LongAccumulator l2 = sparkContext.longAccumulator("Count2") RDD.forEachPartition(f -> { if(F1) l1.add(1) else l2.add(1) });
- 使用次数
RDD.filter(F1).count(); RDD.filter(!F1).count()
第一种方法的一个好处是我们只需要迭代一次数据(很有用,因为我的数据集是 10 TB)
如果使用累加器可以达到相同的效果,那么计数有什么用?
最佳答案
主要区别在于,如果您的代码在转换中失败,则累加器将为 updated而 count() 结果不是。
其他选择是使用纯 map-reduce:
val counts = rdd.map(x => (F1(x), 1)).reduceByKey(_ + _).collectAsMap()
网络成本也应该很低,因为只会发送很少的号码。它创建成对的 (is F1(x) true/false, 1) 然后对所有的求和 - 它会给你 F1(x) 和 !F1(x) 在计数映射中的项目数
关于apache-spark - Spark 蓄能器与计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41261074/