我试图合并多个 JavaRDD,但我只合并了 2 个,有人可以帮忙吗?我已经为此苦苦挣扎了一段时间,但总的来说,我希望能够获得多个集合并使用 sqlContext 创建一个组并打印出所有结果。
这里是我的代码
JavaRDD<AppLog> logs = mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppa_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.fav_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.pps_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.dd_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppt_logs")
)
)
)
);
public JavaRDD<AppLog> mapCollection(JavaSparkContext sc ,String uri){
Configuration mongodbConfig = new Configuration();
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", uri);
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);
return documents.map(
new Function<Tuple2<Object, BSONObject>, AppLog>() {
public AppLog call(final Tuple2<Object, BSONObject> tuple) {
AppLog log = new AppLog();
BSONObject header =
(BSONObject) tuple._2();
log.setTarget((String) header.get("target"));
log.setAction((String) header.get("action"));
return log;
}
}
);
}
//打印集合 SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class);
logsSchema.registerTempTable("logs");
DataFrame groupedMessages = sqlContext.sql(
"select * from logs");
// "select target, action, Count(*) from logs group by target, action");
// "SELECT to, body FROM messages WHERE to = \"eric.bass@enron.com\"");
groupedMessages.show();
logsSchema.printSchema();
最佳答案
如果你想合并多个 JavaRDDs
,只需使用 sc.union(rdd1,rdd2,..)
代替 rdd1.union(rdd2).union (rdd3)
。
关于java - 合并多个 JavaRDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40024690/