java - Apache Spark 使用 Java 加入示例

标签 java join apache-spark option-type

我是 Apache Spark 的新手。我实际上想专注于基本的 Spark API 规范,并想了解和使用 Spark API 编写一些程序。 我已经使用 Apache Spark 编写了一个 java 程序来实现连接概念。

当我使用 Left Outer Join -- leftOuterJoin() 或 Right Outer Join -- rightOuterJoin() 时,这两种方法都返回一个包含特殊类型 Google Options 的 JavaPairRDD。但我不知道如何从 Optional 类型中提取原始值。

无论如何,我想知道我能否使用以我自己的格式返回数据的相同连接方法。我没有找到任何方法来做到这一点。意思是当我使用 Apache Spark 时,我无法以我自己的风格自定义代码,因为它们已经提供了所有预定义的东西。

请在下面找到代码

my 2 sample input datasets

customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter

and

trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit

这是我的 Java 代码

**SparkJoins.java:**

public class SparkJoins {

    @SuppressWarnings("serial")
    public static void main(String[] args) throws FileNotFoundException {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
        JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
        JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] customerSplit = s.split(",");
                return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
            }
        }).distinct();

        JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
        JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] transactionSplit = s.split(",");
                return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
            }
        });

        //Default Join operation (Inner join)
        JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
        System.out.println("Joins function Output: "+joinsOutput.collect());

        //Left Outer join operation
        JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
        System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());

        //Right Outer join operation
        JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
        System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());

        sc.close();
    }
}

这是我得到的输出

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]

我在Windows平台上运行这个程序

请观察上面的输出并帮助我从 Optional 类型中提取值

提前致谢

最佳答案

当您进行左外连接和右外连接时,您可能会得到空值。对!

因此 spark 返回 Optional 对象。获得该结果后,您可以将该结果映射到您自己的格式。

您可以使用 Optional 的 isPresent() 方法来映射您的数据。

例子如下:

 JavaPairRDD<String,String> firstRDD = ....
 JavaPairRDD<String,String> secondRDD =....
 // join both rdd using left outerjoin
 JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD);


// mapping of join result
JavaPairRDD<String, String> mappedRDD = rddWithJoin
            .mapToPair(tuple -> {
                if (tuple._2()._2().isPresent()) {
                    //do your operation and return
                    return new Tuple2<String, String>(tuple._1(), tuple._2()._1());
                } else {
                    return new Tuple2<String, String>(tuple._1(), "not present");
                }
            });

关于java - Apache Spark 使用 Java 加入示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28338694/

相关文章:

php - 如何用sum连接多个表

java - 在 Java Camera2 API 中使用 OpenCV

mysql - 为什么我的 LEFT JOIN 没有返回预期的结果集?

mysql - 连接和组合多个表 MYSQL

scala - 每个循环嵌套两个 DataFrame

scala - 如何计算数据框中每一列每个不同值的出现?

java - 是否有内置函数用于迭代处理 freemarker 字符串的结果?

java - 类 'javax.persistence.Convert' 必须位于所选库中

java - 提高矩阵/表聚合和搜索的性能

string - 将字符串表达式转换为实际工作实例表达式