hadoop - 加入 Spark 输出错误的结果,而 map-side join 是正确的

标签 hadoop join apache-spark

我的 spark 版本是 1.2.0,场景是这样的:

有两个RDD,分别是RDD_A和RDD_B,其数据结构都是RDD[(spid, the_same_spid)]。 RDD_A 有 20,000 行,而 RDD_B 有 3,000,000,000 行。我打算计算其“spid”存在于 RDD_A 中的 RDD_B 的行数。

我的第一个实现相当主流,在 RDD_A 上应用 RDD_B 的 join 方法:

val currentDay = args(0)

val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)

//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)

//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));

//---join---
val filteredTongYuanRdd = tongYuanRdd.join(spidRdds);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

但是,与配置单元的结果相比,结果不正确(大于)。将 join 方法从 reduce-side join 更改为 map-side join 如下所示,结果与配置单元的结果相同:

val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)

//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)

//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));

//---join---
val globalSpids = sc.broadcast(spidRdds.collectAsMap());
val filteredTongYuanRdd = tongYuanRdd.mapPartitions({
  iter =>
    val m = globalSpids.value
    for {
      (spid, spid_cp) <- iter
      if m.contains(spid)
    } yield spid
}, preservesPartitioning = true);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

如您所见,以上两个代码片段之间的唯一区别是“join”部分。

那么,对于解决这个问题有什么建议吗?提前致谢!

最佳答案

Spark 的连接不强制键的唯一性,当键重复时,实际上输出该键的叉积。使用 cogroup 并仅在每个键的 k/v 对上输出,或者仅映射到 id,然后使用 intersection 即可。

关于hadoop - 加入 Spark 输出错误的结果,而 map-side join 是正确的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31128867/

相关文章:

hadoop - 是否有任何 API 可以在给定文件路径的情况下按顺序获取文件的 blockIds?

java - hadoop程序编译时连接log4j报错

java - Hadoop 框架中使用的属性的完整列表

java - spark standalone cluster slave无法将slave连接到master

hadoop - Spark 集群 - 在 hadoop 上读/写

java - Spark javardd 方法collect() 和collectAsync() 之间有什么区别?

hadoop - [SPARK]:java.lang.IllegalArgumentException:java.net.UnknownHostException:水管工

mysql - 如果在 UNION ALL 查询中找不到任何行,则返回默认值

java - 连接满足两个表中多个列的多个表

python - 合并 3 个列表中的 3 个对应项