我的 spark 版本是 1.2.0,场景是这样的:
有两个RDD,分别是RDD_A和RDD_B,其数据结构都是RDD[(spid, the_same_spid)]。 RDD_A 有 20,000 行,而 RDD_B 有 3,000,000,000 行。我打算计算其“spid”存在于 RDD_A 中的 RDD_B 的行数。
我的第一个实现相当主流,在 RDD_A 上应用 RDD_B 的 join
方法:
val currentDay = args(0)
val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)
//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)
//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
//---join---
val filteredTongYuanRdd = tongYuanRdd.join(spidRdds);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())
但是,与配置单元的结果相比,结果不正确(大于)。将 join
方法从 reduce-side join 更改为 map-side join 如下所示,结果与配置单元的结果相同:
val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)
//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)
//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
//---join---
val globalSpids = sc.broadcast(spidRdds.collectAsMap());
val filteredTongYuanRdd = tongYuanRdd.mapPartitions({
iter =>
val m = globalSpids.value
for {
(spid, spid_cp) <- iter
if m.contains(spid)
} yield spid
}, preservesPartitioning = true);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())
如您所见,以上两个代码片段之间的唯一区别是“join”部分。
那么,对于解决这个问题有什么建议吗?提前致谢!
最佳答案
Spark 的连接不强制键的唯一性,当键重复时,实际上输出该键的叉积。使用 cogroup
并仅在每个键的 k/v 对上输出,或者仅映射到 id,然后使用 intersection
即可。
关于hadoop - 加入 Spark 输出错误的结果,而 map-side join 是正确的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31128867/