apache-spark - Spark 数据帧 : Is it more efficient to filter during a join or after?

标签 apache-spark dataframe join apache-spark-sql

我在寻找这个问题的答案时遇到了一些麻烦,所以我想知道是否有人可以帮助我。

这是一些上下文:

我有两个数据框 df1 和 df2 :

val df1: DataFrame = List((1, 2, 3), (2, 3, 3)).toDF("col1", "col2", "col3")
val df2: DataFrame = List((1, 5, 6), (1, 2, 5)).toDF("col1", "col2_bis", "col3_bis")

我想做的是

join those dataframes df1 and df2 on "col1", but only keeping the rows where df1("col2") < df2("col2_bis")



所以我的问题是,这样做是否更有效:
df1.join(df2, df1("col1") === df2("col1") and df1("col2") < df2("col2_bis"), "inner")

或者像这样:
df1.join(df2, Seq("col1"), "inner").filter(col("col2") < col("col2_bis"))

结果是:
Array(Row(1, 2, 3, 5, 6)) with columns ("col1", "col2", "col2_bis", "col3", "col3_bis")

这两个表达式是否解析为相同的执行计划?或者其中一个比另一个更省时?

谢谢你。

最佳答案

如果看查询计划,两者都是一样的,join没有区别。催化剂优化器在幕后进行各种优化。

scala> val df2 = List((1, 5, 6), (1, 2, 5)).toDF("col1", "col2_bis", "col3_bis")
df2: org.apache.spark.sql.DataFrame = [col1: int, col2_bis: int ... 1 more field]

scala> val df1 = List((1, 2, 3), (2, 3, 3)).toDF("col1", "col2", "col3")
df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 1 more field]

scala> df1.join(df2, df1("col1") === df2("col1") and df1("col2") < df2("col2_bis"), "inner")
res0: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 4 more fields]

scala> df1.join(df2, Seq("col1"), "inner").filter(col("col2") < col("col2_bis"))
res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [col1: int, col2: int ... 3 more fields]

scala> res0.show
+----+----+----+----+--------+--------+
|col1|col2|col3|col1|col2_bis|col3_bis|
+----+----+----+----+--------+--------+
|   1|   2|   3|   1|       5|       6|
+----+----+----+----+--------+--------+

scala> res1.show
+----+----+----+--------+--------+
|col1|col2|col3|col2_bis|col3_bis|
+----+----+----+--------+--------+
|   1|   2|   3|       5|       6|
+----+----+----+--------+--------+

scala> res0.explain
== Physical Plan ==
*BroadcastHashJoin [col1#21], [col1#7], Inner, BuildRight, (col2#22 < col2_bis#8)
:- LocalTableScan [col1#21, col2#22, col3#23]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [col1#7, col2_bis#8, col3_bis#9]

scala> res1.explain
== Physical Plan ==
*Project [col1#21, col2#22, col3#23, col2_bis#8, col3_bis#9]
+- *BroadcastHashJoin [col1#21], [col1#7], Inner, BuildRight, (col2#22 < col2_bis#8)
   :- LocalTableScan [col1#21, col2#22, col3#23]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [col1#7, col2_bis#8, col3_bis#9]

关于apache-spark - Spark 数据帧 : Is it more efficient to filter during a join or after?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50905982/

相关文章:

compression - Apache Spark 中的 Zip 支持

r - 过滤一列中 > 0 的数据,其余列中 == 0 的数据

MySQL 多重右连接

python - 为什么带有 pd.isnull 的 pd.DataFrame 失败?

python - 如何将多列初始化为现有的 pandas DataFrame

mysql - 选择 3 个表并进行计数和连接

mysql - 内部自连接基于不同标准的计数

scala.MatchError : [Ljava. lang.String; (类 [Ljava.lang.String;)

windows - 在 Windows 和 Apache Toree Kernel 上使用 Jupyter 以实现 Spark 兼容性

java - Spark Streaming App通过代码提交