java - Flink 中的 LeftOuterJoin(JAVA API)

标签 java mapreduce apache-flink bigdata

我正在尝试在 Flink 中执行 LeftOuterJoin。 我不会尝试自己实现 leftOuterJoin ,因为它已完成 与 CoGroupFunction 在这里:https://gist.github.com/mxm/c2e9c459a9d82c18d789

我正在尝试使用 FlatJoinFunction:

    public static final class leftOuter implements FlatJoinFunction<Tuple3<String,String,String>, Tuple2<String,String>, Tuple2<String,String>>{


    @Override
    public void join(Tuple3<String, String, String> in1,
            Tuple2<String, String> in2,
            Collector<Tuple2<String, String>> out) throws Exception {
        // TODO Auto-generated method stub
        out.collect(new Tuple2<String,String>(in1.f0, in2.f1 == null ? "null" : in2.f1));

    }

}

我将此函数称为:

        input1.leftOuterJoin(input2).where(0)
            .equalTo(1)
            .with(new leftOuter());

不幸的是,我在 out.collect 行中遇到 NullPointerException。

预先感谢您的帮助!

最佳答案

这是左外连接的预期行为。

根据您的程序,左外连接在两种情况下调用 JoinFunction:

  1. 如果两个输入,input1input2,都有具有相同连接键的记录,则为每个元素调用 join()该 key 的笛卡尔积。
  2. 如果左侧输入 input1 的记录的键不存在于右侧输入 (input2) 中,则 join() 会使用 input1 键为每条记录调用,并为正确的输入调用 null

您应该在 JoinFunction 中添加对 in2 == null 的检查。

关于java - Flink 中的 LeftOuterJoin(JAVA API),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40029076/

相关文章:

java - 使用 Java 客户端在 Apache YARN 上运行 MapReduce 应用程序

java - 为什么物体是看不见的?

用于计算参数值的 MapReduce

java - 需要通过Main.class运行suite.xml

hadoop - 如何在容器格式的 Hadoop 中使用 Snappy

scala - apache flink 的 union 类型混淆?

java - 如何使用服务器上的 -Dlog4j.configuration=file :/path/to/log4j. 属性运行我的 flink 作业

java - 使用 Apache Flink 进行动态模式评估

java - 所包含的声明是什么?

java - Akka Java 文件 IO 限制