场景
我正在尝试编写一个 Spark 程序来高效地执行两个 RDD 之间的左外连接。需要注意的是,这些 RDD 可以有重复的键,这显然会导致整个程序效率低下。
我要实现的目标很简单:
- 给定两个 RDD:
rdd1
和rdd2
(两者具有相同的结构:(k, v)
) - 使用
rdd1
和rdd2
,生成另一个 RDDrdd3
,其结构为:(k1, v1, List(v2. .))
k1
和v1
来自rdd1
(相同的值,这将导致rdd1
和rdd3
长度相同)List(v2..)
是一个列表,其值来自rdd2
- 要将
rdd2
的v
添加到rdd3
的元组列表中,它的k
(rdd2
中的 key 应与rdd1
中的
k
相匹配
我的尝试
我的方法是使用左外连接。所以,我想到了这样的事情:
rdd1.leftOuterJoin(rdd2).map{case(k, (v1, v2)) => ((k, v1), Array(v2))}
.reduceByKey(_ ++ _)
这实际上产生了我想要达到的结果。但是,当我使用大量数据时,程序变得非常慢。
一个例子
以防万一我的想法还不清楚,我有以下示例:
给定两个具有以下数据的 RDD:
rdd1
:
key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c
rdd2
:
key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z
结果 rdd3
应该是
key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y
最佳答案
首先不要使用:
map { ... => (..., Array(...)) }.reduceByKey(_ ++ _)
这几乎是低效的。要使用 RDD 对这样的值进行分组,您真的应该使用 groupByKey
。
此外仅在 groupByKey
之后是非常浪费的。您在右侧两次执行相同的工作(按键分组)。直接使用 cogroup
(这就是 RDD 连接的工作方式)和 flatMap
val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))
val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))
val rdd = rdd1
.cogroup(rdd2)
.flatMapValues { case (left, right) => left.map((_, right)) }
.map { case (k1, (k2, vs)) => ((k1, k2), vs) }
您还可以使用 DataSet
API,在这种情况下往往效率更高
import org.apache.spark.sql.functions.collect_list
val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")
df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
结果:
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
如果键集的交集很小,您可以先尝试通过应用过滤器来优化过程
val should_keep = {
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))
}
df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
当使用 Dataset
API 时,请务必调整 spark.sql.shuffle.partitions
以反射(reflect)您处理的数据量。
注意:
如果 rdd2
中的重复项数量很大,那么这些都无济于事。在这种情况下,整个问题的表述是无法捍卫的,您应该尝试重新表述它,同时考虑到下游过程的要求。
关于apache-spark - Spark : RDD Left Outer Join Optimization for Duplicate Keys,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53314711/