apache-spark - Spark : RDD Left Outer Join Optimization for Duplicate Keys

标签 apache-spark join rdd

场景

我正在尝试编写一个 Spark 程序来高效地执行两个 RDD 之间的左外连接。需要注意的是,这些 RDD 可以有重复的键,这显然会导致整个程序效率低下。

我要实现的目标很简单:

  • 给定两个 RDD:rdd1rdd2(两者具有相同的结构:(k, v))
  • 使用 rdd1rdd2,生成另一个 RDD rdd3,其结构为:(k1, v1, List(v2. .))
  • k1v1 来自 rdd1 (相同的值,这将导致 rdd1 rdd3 长度相同)
  • List(v2..) 是一个列表,其值来自 rdd2
  • 要将 rdd2v 添加到 rdd3 的元组列表中,它的 k ( rdd2 中的 key 应与 rdd1
  • 中的 k 相匹配

我的尝试

我的方法是使用左外连接。所以,我想到了这样的事情:

rdd1.leftOuterJoin(rdd2).map{case(k, (v1, v2)) => ((k, v1), Array(v2))}
                        .reduceByKey(_ ++ _)

这实际上产生了我想要达到的结果。但是,当我使用大量数据时,程序变得非常慢。

一个例子

以防万一我的想法还不清楚,我有以下示例:

给定两个具有以下数据的 RDD:

rdd1:

key | value
-----------
 1  |  a
 1  |  b
 1  |  c
 2  |  a
 2  |  b
 3  |  c

rdd2:

key | value
-----------
 1  |  v
 1  |  w
 1  |  x
 1  |  y
 1  |  z
 2  |  v
 2  |  w
 2  |  x
 3  |  y
 4  |  z

结果 rdd3 应该是

key | value | list
------------------------
1   |   a   |  v,w,x,y,z
1   |   b   |  v,w,x,y,z
1   |   c   |  v,w,x,y,z
2   |   a   |  v,w,x
2   |   b   |  v,w,x
3   |   c   |  y

最佳答案

首先不要使用:

map { ... => (..., Array(...)) }.reduceByKey(_ ++ _)

这几乎是低效的。要使用 RDD 对这样的值进行分组,您真的应该使用 groupByKey

此外仅在 groupByKey 之后是非常浪费的。您在右侧两次执行相同的工作(按键分组)。直接使用 cogroup(这就是 RDD 连接的工作方式)和 flatMap

更有意义
val rdd1 = sc.parallelize(Seq(
  (1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))

val rdd2 = sc.parallelize(Seq(
  (1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
  (2, "w"), (2, "x"), (3, "y"),(4, "z")
))

val rdd = rdd1
  .cogroup(rdd2)
  .flatMapValues { case (left, right) => left.map((_, right)) }
  .map { case (k1, (k2, vs)) => ((k1, k2), vs) }

您还可以使用 DataSet API,在这种情况下往往效率更高

import org.apache.spark.sql.functions.collect_list

val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")


df2.groupBy("k")
 .agg(collect_list("v").as("list"))
 .join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
 .show

结果:

+---+---------------+---+                 
|  k|           list|  v|
+---+---------------+---+
|  1|[v, w, x, y, z]|  a|
|  1|[v, w, x, y, z]|  b|
|  1|[v, w, x, y, z]|  c|
|  3|            [y]|  c|
|  2|      [v, w, x]|  a|
|  2|      [v, w, x]|  b|
+---+---------------+---+

如果键集的交集很小,您可以先尝试通过应用过滤器来优化过程

val should_keep = {
  val f = df1.stat.bloomFilter("k", df1.count, 0.005)
  udf((x: Any) => f.mightContain(x))
}


df2.where(should_keep($"k")).groupBy("k")
 .agg(collect_list("v").as("list"))
 .join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
 .show
+---+---------------+---+
|  k|           list|  v|
+---+---------------+---+
|  1|[v, w, x, y, z]|  a|
|  1|[v, w, x, y, z]|  b|
|  1|[v, w, x, y, z]|  c|
|  3|            [y]|  c|
|  2|      [v, w, x]|  a|
|  2|      [v, w, x]|  b|
+---+---------------+---+

当使用 Dataset API 时,请务必调整 spark.sql.shuffle.partitions 以反射(reflect)您处理的数据量。

注意:

如果 rdd2 中的重复项数量很大,那么这些都无济于事。在这种情况下,整个问题的表述是无法捍卫的,您应该尝试重新表述它,同时考虑到下游过程的要求。

关于apache-spark - Spark : RDD Left Outer Join Optimization for Duplicate Keys,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53314711/

相关文章:

apache-spark - PySpark:如何重新采样频率

pandas - AWS EMR 上的带有 pandas 和 pyarrow 的 pyspark 错误 : 'JavaPackage' object is not callable

mysql - Laravel 和 MySQL 有一个连接吗?

mysql - SQL 多重内连接

mysql - 我如何编写此语句才能使其保持为 ActiveRecord 调用?

apache-spark - 缓存的 RDD 存储在哪里(即以分布式方式或在单个节点上)?

float() 的 Python 无效文字

java - Spark 数据集使用 agg() 方法计算与条件匹配的行数(在 Java 中)

scala - 如何根据值列表过滤 RDD

scala - 在 Spark/Hadoop 中保存为自定义输出格式