我正在尝试在 Flink 中执行 LeftOuterJoin。 我不会尝试自己实现 leftOuterJoin ,因为它已完成 与 CoGroupFunction 在这里:https://gist.github.com/mxm/c2e9c459a9d82c18d789
我正在尝试使用 FlatJoinFunction:
public static final class leftOuter implements FlatJoinFunction<Tuple3<String,String,String>, Tuple2<String,String>, Tuple2<String,String>>{
@Override
public void join(Tuple3<String, String, String> in1,
Tuple2<String, String> in2,
Collector<Tuple2<String, String>> out) throws Exception {
// TODO Auto-generated method stub
out.collect(new Tuple2<String,String>(in1.f0, in2.f1 == null ? "null" : in2.f1));
}
}
我将此函数称为:
input1.leftOuterJoin(input2).where(0)
.equalTo(1)
.with(new leftOuter());
不幸的是,我在 out.collect 行中遇到 NullPointerException。
预先感谢您的帮助!
最佳答案
这是左外连接的预期行为。
根据您的程序,左外连接在两种情况下调用 JoinFunction
:
- 如果两个输入,
input1
和input2
,都有具有相同连接键的记录,则为每个元素调用join()
该 key 的笛卡尔积。 - 如果左侧输入
input1
的记录的键不存在于右侧输入 (input2
) 中,则join()
会使用input1
键为每条记录调用,并为正确的输入调用null
。
您应该在 JoinFunction
中添加对 in2 == null
的检查。
关于java - Flink 中的 LeftOuterJoin(JAVA API),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40029076/