java - Spark reduceByKey() 不会对最终总和进行洗牌

标签 java apache-spark

我有一个 RDD,其中包含某些对象的计数,然后我在其上应用 reduceByKey(),对所有元素求和(如字数统计示例中所示)。我已将 reduceByKey 转换的输出保存到文本文件中,并且获得了每个工作人员的总和:

(work at  LEFT null,9741)
(work at  LEFT null,10073)
(work at  LEFT null,10348)
(work at  LEFT null,10483)
(work at  LEFT null,10754)

难道不应该只是一项,而是所有项的总和吗?

如果需要更多详细信息,我会提供。

LE:我试图计算的对象是由定义的

public class Pattern {
     string pattern;
     PatternType type;
     Relation r;
}

最佳答案

在 Spark 中,PairRDDFunctions.reduceByKeyRDD[(K, V)]并使用定义的分区器对数据进行分区(导致随机播放)。如果没有提供这样的分区器,它将使用默认的 HashPartitioner决定将哪个键值对传递给哪个工作人员。如果您使用 Java 类作为 key ,但不会覆盖它的 hashCode方法,reduceByKey会根据Java的Object.hashCode来决定如何对数据进行分区。这意味着相同的 key 将被卸载给不同的工作人员,在那里它们将被部分减少。理想情况下,这不是您想要的。您想要的是具有相同 key 的所有对象都将通过同一个工作人员减少。然后,当每个工作人员减少自己的数据后对它们进行洗牌时,所有键的组合器将无法根据其哈希码匹配键,这解释了为什么您只看到部分减少的数据而不是总和单个键上的数据。

您需要做的是提供适当的hashCode equals执行。 Spark documentation 中对此进行了说明。 (感谢@VitaliyKotlyarenko):

Note: when using custom objects as the key in key-value pair operations, you must be sure that a custom equals() method is accompanied with a matching hashCode() method. For full details, see the contract outlined in the Object.hashCode() documentation

例如:

public class Pattern {
     string pattern;
     PatternType type;
     Relation r;

     @Override
     public int hashCode() {
        return 371 * pattern.hashCode();
     }

     @Override 
     public boolean equals(Object other) {
        if (this == other) return true;
        if (other == null || this.getClass() != other.getClass()) return false;

        Pattern pattern = (Pattern) other;
        return this.pattern.equals(pattern.pattern);
     }
}

关于java - Spark reduceByKey() 不会对最终总和进行洗牌,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37228577/

相关文章:

scala - 对 RDD 的其余部分执行一个元素的操作

java - 从文件中读取字节后,大多数是正确的,除了 1 是错误的和负的

java - ./server_start.sh : line 41: kill: (21556) - No such process

apache-spark - DAG 调度程序与 Spark 的催化剂

java - 对字符串执行的最小循环(向前或向后)移位以使其成为回文

apache-spark - 在pyspark数据帧中的两个日期之间生成每月时间戳

apache-spark - 为什么reduceByKey后所有数据都在一个分区?

java - 具有自定义响应的 CXF 自定义验证拦截器不起作用

java - 使用相同的包名和 key 对 apk 进行签名

java - Osgi 中的 NoClassDefFoundError w/Felix, Ant