java - 为什么 groupByKey 与自定义类一起使用时不能给出正确的组?

标签 java apache-spark rdd

我为我的数据定义了一个自定义类 Person 并使用了 groupByKey 操作,如下所示:

public class Person implements Serializable {
    private static final long serialVersionUID = 1L;
    private int personId;
    private String name;
    private String address;
    public Person(int personId, String name, String address) {
        this.personId = personId;
        this.name = name;
        this.address = address;
    }
    public int getPersonId() {  return personId;}
    public void setPersonId(int personId) { this.personId = personId;}
    public String getName() {   return name;}
    public void setName(String name) {  this.name = name;}
    public String getAddress() {    return address;}
    public void setAddress(String address) {    this.address = address;}
}
List<Person> personList = new ArrayList<Person>();
personList.add(new Person(111, "abc", "test1"));
personList.add(new Person(222, "def", "test2"));
personList.add(new Person(333, "fhg", "test3"));
personList.add(new Person(111, "jkl", "test4"));
personList.add(new Person(555, "mno", "test5"));
personList.add(new Person(444, "pqr", "test6"));
personList.add(new Person(111, "xyz", "test7"));

JavaRDD<Person> initialRDD = sc.parallelize(personList, 4);

JavaPairRDD<Person, Iterable<Person>> groupedBy = 
    initialRDD.cartesian(initialRDD).groupByKey();

但是使用以下命令的结果不会根据键进行任何分组。

groupedBy.foreach(x -> System.out.println(x._1.getPersonId()));

结果为:222 111 555 444 555 111 222 111 333 222 444 111 111 111 44​​4 111 333 111 111 222 555 111 333 333 444 111 111 555

我期望结果只是唯一的键。我对Spark中的groupByKey函数的理解有误吗?

最佳答案

groupByKey 与其他 byKey 操作相同,依赖于 hashCodeequals 的有意义的实现。由于您没有提供自己的实现,Person 将使用默认的实现,这在这种情况下毫无用处。

尝试例如:

@Override
public int hashCode() {
    return this.personId;
}

@Override
public boolean equals(Object o) {
    return this.hashCode() == o.hashCode();
}

关于java - 为什么 groupByKey 与自定义类一起使用时不能给出正确的组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43986250/

相关文章:

java - 实现类列表中的接口(interface)列表

scala - 减少 Scala 程序中 Spark 的日志输出

scala - 使用 Spark Scala 为每个分区添加一个常量值

python - pyspark/python 3.6(类型错误 : 'int' object is not subscriptable) list/tuples

python - 如何测试 DataFrame 中的列是否存在且不为空

java - 为什么我们有 contains(Object o) 而不是 contains(E e)?

java - 用于实例化内部类的奇怪语法

java - HTTP FAILED : java.net.SocketTimeoutException:新 Android gradle 超时

scala - 如何将 RDD[Row] 转换回 DataFrame

docker - spark-submit如何在群集模式下传递--driver-class-path?