apache-spark - createCombiner,mergeValue,mergeCombiner如何在Spark的CombineByKey中工作(使用Scala)

标签 apache-spark

我试图了解combineByKeys中的每个步骤如何工作。

有人可以帮我了解下面的RDD吗?

val rdd = sc.parallelize(List(
  ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4), 
  ("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),
   ("C", 122), ("C", 3), ("C", 55)), 2)

rdd.combineByKey(
    (x:Int) => (x, 1),
    (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
    (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

最佳答案

首先,让我们分解一下过程:

首先,createCombiner为键在分区上的首次匹配创建初始值(合并器)(如果未找到)--> (firstValueEncountered, 1)。因此,这仅仅是用第一个值和键计数器为1初始化元组。

然后,仅当已经为该分区上的找到的键mergeValue -->创建了组合器(在我们的例子中为元组)时,才会触发(existingTuple._1 + subSequentValue, existingTuple._2 + 1)。这将现有元组的值(在第一个插槽中)与新遇到的值相加,并获取现有元组的计数器(在第二个插槽中)并对其进行递增。所以,我们是

然后,mergeCombiner接收在每个分区上创建的组合器(元组),并将它们合并在一起--> (tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2)。这只是将来自每个元组的值加在一起,并将计数器加在一起成为一个元组。

然后,让我们将数据的子集分解为多个分区,然后观察它们的作用:

("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)

分区1
A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)

分区2
A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)

合并分区
A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)

因此,您应该返回类似以下的数组:
Array((A, (24, 3)), (B, (25, 3)))

关于apache-spark - createCombiner,mergeValue,mergeCombiner如何在Spark的CombineByKey中工作(使用Scala),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29246756/

相关文章:

scala - 如何将 RDD 数据保存到 json 文件中,而不是文件夹中

python - spark 查找最大值和关联的键

scala - (run-main-0) java.lang.NoSuchMethodError

python - 将 docker 容器内的 spark-submit 发送到 YARN 集群

java - 如果存储在键上的值匹配,如何合并 Spark 中的两个 RDD

r - 将 Sparklyr 的 <dbl [2]> 结果拆分为 Spark 对象

java - Parquet .io.ParquetDecodingException : Can not read value at 0 in block -1 in file

java - 尝试通过 Java SDK 将记录从 Spark DataFrame 写入 Dynamodb 时任务不可序列化

amazon-web-services - 将 S3 数据文件夹转换为分桶和分区数据存储

java - 根据第一个数据框 Java 中的列创建具有新列值的新 Spark DataFrame