java - 合并多个 JavaRDD

标签 java apache-spark

我试图合并多个 JavaRDD,但我只合并了 2 个,有人可以帮忙吗?我已经为此苦苦挣扎了一段时间,但总的来说,我希望能够获得多个集合并使用 sqlContext 创建一个组并打印出所有结果。

这里是我的代码

  JavaRDD<AppLog> logs =  mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppa_logs").union(
                              mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.fav_logs").union(
                                mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.pps_logs").union(
                                  mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.dd_logs").union(
                                    mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppt_logs")
                                  )
                                )
                              )
                          );


public JavaRDD<AppLog> mapCollection(JavaSparkContext sc ,String uri){

  Configuration mongodbConfig = new Configuration();
  mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
  mongodbConfig.set("mongo.input.uri", uri);

  JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
      mongodbConfig,            // Configuration
      MongoInputFormat.class,   // InputFormat: read from a live cluster.
      Object.class,             // Key class
      BSONObject.class          // Value class
    );

    return documents.map(

      new Function<Tuple2<Object, BSONObject>, AppLog>() {

          public AppLog call(final Tuple2<Object, BSONObject> tuple) {
              AppLog log = new AppLog();
              BSONObject header =
                (BSONObject) tuple._2();

              log.setTarget((String) header.get("target"));
              log.setAction((String) header.get("action"));

              return log;
          }
      }
    );
}

//打印集合 SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

    DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class);
    logsSchema.registerTempTable("logs");

    DataFrame groupedMessages = sqlContext.sql(
      "select * from logs");
      // "select target, action, Count(*) from logs group by target, action");

      // "SELECT to, body FROM messages WHERE to = \"eric.bass@enron.com\"");



    groupedMessages.show();

    logsSchema.printSchema();

最佳答案

如果你想合并多个 JavaRDDs ,只需使用 sc.union(rdd1,rdd2,..) 代替 rdd1.union(rdd2).union (rdd3)

同时检查这个 RDD.union vs SparkContex.union

关于java - 合并多个 JavaRDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40024690/

相关文章:

java - 如何使 SimpleDateFormat 与夏令时一起工作?

java - Android 上下文问题 - 广播接收器(警报管理器)中应使用什么上下文?

mongodb - Spark - 如何在map()中创建新的RDD? (对于 Executor,SparkContext 为 null)

python - TCP 上的 Spark 流式传输

scala - 将重试添加到 future 序列中,以便在 Scala 中并行运行 Databricks 笔记本

java - 安全:090759 in weblogic managed logs

java - 如何使用 Java + Selenium WebDriver 导出/导入 cookie

java - 如何在 Avro 中追踪 "<init>()V"故障的根源?

python - 如何使用sqlContext计算累计和

apache-spark - 不同端口上的 Spark Thrift 服务器