我在使用join
方法时遇到问题在斯帕克.我将报告一个简短的代码示例以更好地解释问题。
我有一个自定义类,它充当 Calendar 类的包装器。
public class CalendarWrapper implements Serializable {
private static final long serialVersionUID = 5089608400685021565L;
private Calendar c;
public Calendar getC() {
return c;
}
public CalendarWrapper(Calendar c) {
super();
this.c = c;
}
@Override
public boolean equals(Object otherC) {
if(otherC == null) {
return false;
}
if(this == otherC) {
return true;
}
if(this.c.equals(((CalendarWrapper)otherC).getC())) {
return true;
}
else return false;
}
}
我使用此类来初始化为 JavaPairRDD<CalendarWrapper,V>
,然后我执行 join
两者之中。以下代码显示了我的程序,其中我在每个 JavaPairRDD 中添加一条记录,其中两个 CalendarWrapper
的日期设置为相同。
public class JoinTest {
public static void main(String[] args) {
System.out.println("Initializing Spark Context");
SparkConf conf = new SparkConf().setAppName("Join test")
.setMaster("local[1]").set("spark.executor.memory", "1g");
JavaSparkContext sc = new JavaSparkContext(conf);
int year = 2016;
int month = 1;
int day = 25;
int hour = 11;
int minute = 5;
int second = 0;
ArrayList<CalendarWrapper> cal1 = new ArrayList<>();
Calendar c = Calendar.getInstance();
c.clear();
c.set(Calendar.YEAR, year);
c.set(Calendar.MONTH, month);
c.set(Calendar.DAY_OF_MONTH, day);
c.set(Calendar.HOUR, hour);
c.set(Calendar.MINUTE, minute);
c.set(Calendar.SECOND, second);
cal1.add(new CalendarWrapper(c));
ArrayList<CalendarWrapper> cal2 = new ArrayList<>();
Calendar c2 = Calendar.getInstance();
c2.clear();
c2.set(Calendar.YEAR, year);
c2.set(Calendar.MONTH, month);
c2.set(Calendar.DAY_OF_MONTH, day);
c2.set(Calendar.HOUR, hour);
c2.set(Calendar.MINUTE, minute);
c2.set(Calendar.SECOND, second);
cal2.add(new CalendarWrapper(c));
if(c2.equals(c)) {
System.out.println("Time equals");
}
JavaRDD<CalendarWrapper> cal1RDD = sc.parallelize(cal1);
JavaRDD<CalendarWrapper> cal2RDD = sc.parallelize(cal2);
JavaPairRDD<CalendarWrapper, String> cal1PairRDD = cal1RDD
.mapToPair(x -> new Tuple2<CalendarWrapper, String>(x,
"test"));
JavaPairRDD<CalendarWrapper, Boolean> cal2PairRDD = cal2RDD
.mapToPair(x -> new Tuple2<CalendarWrapper, Boolean>(x, true));
JavaPairRDD<CalendarWrapper, Tuple2<Boolean, String>> cal2cal1Join = cal2PairRDD
.join(cal1PairRDD);
System.out.println("Join size " + cal2cal1Join.count());
sc.close();
}
}
现在,大约每十次运行就有九次 join
的结果是无效的,而在剩余的运行中它最终会得到预期的结果。我已经清除了Calendars
为了不出现毫秒问题(我还进行了彻底的检查,以确保问题不是源自日历比较)。我还创建了JavaPairRDD
与 String
和Boolean
为了检查这是否可能是问题(实际上不是 - 我在某处读到 join
是作为 union().groupBy().flatMap()
实现的,我猜测 union
是否会导致问题)。好处是,如果我使用 Calendar
而不是CalendarWrapper
,一切总是工作正常。然后我尝试查看被覆盖的 equals
中发生了什么,显然系统会执行四次比较,每次将当前对象与 null
进行比较。
我尝试查看 Spark 代码,但有点困难。我搜索了有关join
如何的信息工作,但没有任何运气。也许我正在做一些 super 愚蠢的事情,但现在似乎不起作用。
最佳答案
您遇到问题的原因可能是您在没有 hashCode
的情况下实现了自定义 equals
。一般来说,如果您实现了其中一个,则应该始终实现另一个。
http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
Note: when using custom objects as the key in key-value pair operations, you must be sure that a custom equals() method is accompanied with a matching hashCode() method. For full details, see the contract outlined in the Object.hashCode() documentation.
您希望哈希码具有确定性并基于日历
中存储的时间。您可能可以直接调用 Calendar.hashCode()
。
关于java - 当索引位于自定义类上时,Spark join() 如何工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35634167/