java - 当索引位于自定义类上时,Spark join() 如何工作?

标签 java join apache-spark equals rdd

我在使用join方法时遇到问题在斯帕克.我将报告一个简短的代码示例以更好地解释问题。

我有一个自定义类,它充当 Calendar 类的包装器。

public class CalendarWrapper implements Serializable {

private static final long serialVersionUID = 5089608400685021565L;
private Calendar c;

public Calendar getC() {
    return c;
}

public CalendarWrapper(Calendar c) {
    super();
    this.c = c;
}

@Override 
public boolean equals(Object otherC) {

    if(otherC == null) {
        return false;
    }

    if(this == otherC) {
        return true;
    }

    if(this.c.equals(((CalendarWrapper)otherC).getC())) {
        return true;
    }
    else return false;
}
}

我使用此类来初始化为 JavaPairRDD<CalendarWrapper,V> ,然后我执行 join两者之中。以下代码显示了我的程序,其中我在每个 JavaPairRDD 中添加一条记录,其中两个 CalendarWrapper 的日期设置为相同。

public class JoinTest {

public static void main(String[] args) {
    System.out.println("Initializing Spark Context");
    SparkConf conf = new SparkConf().setAppName("Join test")
            .setMaster("local[1]").set("spark.executor.memory", "1g");
    JavaSparkContext sc = new JavaSparkContext(conf);

    int year = 2016;
    int month = 1;
    int day = 25;
    int hour = 11;
    int minute = 5;
    int second = 0;

    ArrayList<CalendarWrapper> cal1 = new ArrayList<>();

    Calendar c = Calendar.getInstance();
    c.clear();
    c.set(Calendar.YEAR, year);
    c.set(Calendar.MONTH, month);
    c.set(Calendar.DAY_OF_MONTH, day);
    c.set(Calendar.HOUR, hour);
    c.set(Calendar.MINUTE, minute);
    c.set(Calendar.SECOND, second);
    cal1.add(new CalendarWrapper(c));

    ArrayList<CalendarWrapper> cal2 = new ArrayList<>();

    Calendar c2 = Calendar.getInstance();
    c2.clear();
    c2.set(Calendar.YEAR, year);
    c2.set(Calendar.MONTH, month);
    c2.set(Calendar.DAY_OF_MONTH, day);
    c2.set(Calendar.HOUR, hour);
    c2.set(Calendar.MINUTE, minute);
    c2.set(Calendar.SECOND, second);
    cal2.add(new CalendarWrapper(c));

    if(c2.equals(c)) {
        System.out.println("Time equals");
    }

    JavaRDD<CalendarWrapper> cal1RDD = sc.parallelize(cal1);
    JavaRDD<CalendarWrapper> cal2RDD = sc.parallelize(cal2);

    JavaPairRDD<CalendarWrapper, String> cal1PairRDD = cal1RDD
            .mapToPair(x -> new Tuple2<CalendarWrapper, String>(x,
                    "test"));
    JavaPairRDD<CalendarWrapper, Boolean> cal2PairRDD = cal2RDD
            .mapToPair(x -> new Tuple2<CalendarWrapper, Boolean>(x, true));

    JavaPairRDD<CalendarWrapper, Tuple2<Boolean, String>> cal2cal1Join = cal2PairRDD
            .join(cal1PairRDD);
    System.out.println("Join size " + cal2cal1Join.count());
    sc.close();
}
}

现在,大约每十次运行就有九次 join 的结果是无效的,而在剩余的运行中它最终会得到预期的结果。我已经清除了Calendars为了不出现毫秒问题(我还进行了彻底的检查,以确保问题不是源自日历比较)。我还创建了JavaPairRDDStringBoolean为了检查这是否可能是问题(实际上不是 - 我在某处读到 join 是作为 union().groupBy().flatMap() 实现的,我猜测 union 是否会导致问题)。好处是,如果我使用 Calendar而不是CalendarWrapper ,一切总是工作正常。然后我尝试查看被覆盖的 equals 中发生了什么,显然系统会执行四次比较,每次将当前对象与 null 进行比较。

我尝试查看 Spark 代码,但有点困难。我搜索了有关join如何的信息工作,但没有任何运气。也许我正在做一些 super 愚蠢的事情,但现在似乎不起作用。

最佳答案

您遇到问题的原因可能是您在没有 hashCode 的情况下实现了自定义 equals。一般来说,如果您实现了其中一个,则应该始终实现另一个。

http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs

Note: when using custom objects as the key in key-value pair operations, you must be sure that a custom equals() method is accompanied with a matching hashCode() method. For full details, see the contract outlined in the Object.hashCode() documentation.

您希望哈希码具有确定性并基于日历中存储的时间。您可能可以直接调用 Calendar.hashCode()

关于java - 当索引位于自定义类上时,Spark join() 如何工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35634167/

相关文章:

java - 如何从 HTML 中删除除 java 中特殊标记之外的所有内容?

mysql - 在另一个查询 MySQL 中使用来自一个查询的 SUM

java - 如何使用 JSF/MyFaces 创建基于用户角色的条件?

java - 是否可以在不使用 ETag 的情况下使用 aws-java-sdk-s3 进行分段上传?

sql - SQL合并中发生无意的交叉连接

php - 加入同一日期的总和

algorithm - 分布式互相关矩阵计算

apache-spark - 无法解析字段名称中的列名称

java - 在eclipse中编译spark项目时出现问题

java - 使用 ResponseBody 下载 spring 文件