假设我们有一个健康的集群,对于我们的用例
two datasets with 1 Billlion + records
我们需要比较两个数据集并找出
duplicates in the original dataset
我正打算写一个
sql query with join on the columns that are to be checked for duplicates
我想知道会怎样
performance for this query and also the refinements
这可以在加入数据集之前在数据集(数据帧分区)中完成。
请务必提出您的意见。
最佳答案
I wanted to know how will be the performance
与什么相比?至于绝对数字,我认为这显然取决于您的数据和集群。
但是,在 Spark 2.0 中,性能改进非常显着。
and the refinements
Catalyst 优化器非常好(在 2.0 之后更是如此)。在它下面负责大部分优化,如列修剪、谓词下推等。(在 2.0 中还有代码生成,负责生成非常优化的代码,从而实现非常大的性能改进。)
无论您使用 DataFrames/Datasets API 还是 SQL,这些改进都可以全面使用。
作为 Spark 催化剂所做的查询优化类型的示例,假设您有两个数据帧 df1 和 df2 具有相同的架构(根据您的情况),并且您希望将它们连接到某些列上以仅获取交集并输出这些元组。
假设我的数据帧架构如下(调用
df.schema
):StructType(
StructField(df.id,StringType,true),
StructField(df.age,StringType,true),
StructField(df.city,StringType,true),
StructField(df.name,StringType,true))
也就是说,我们的数据集中有 id、age、city、name 列。
现在考虑到你想做的事情,你会做类似的事情
df1.join(
df2,
$"df2.name"===$"df1.name"
).select("df1.id","df1.name", "df1.age", "df1.city" ).show
如果您查看上面的物理计划,您会注意到 Catalyst 优化器在幕后进行了许多优化:
== Physical Plan ==
*Project [df1.id#898, df1.name#904, df1.age#886, df1.city#892]
+- *BroadcastHashJoin [df1.name#904], [df2.name#880], Inner, BuildRight
:- *Project [age#96 AS df1.age#886, city#97 AS df1.city#892, id#98 AS df1.id#898, name#99 AS df1.name#904]
: +- *Filter isnotnull(name#99)
: +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [name#99 AS df2.name#880]
+- *Filter isnotnull(name#99)
+- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>
`
特别要注意的是,即使两个相同的数据帧被连接起来,它们的读取方式也不同——
name
列(而不是带有 id, age
等的整个记录)。如果将此信息下推到我的数据被读取的位置,不是很好吗?这将使我免于阅读我不打算使用的不必要的数据。这正是 Spark 所做的!对于 join 的一侧,Spark 只会读取 name
柱子。这一行:+- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>
另一边df1
但是,我们希望加入后结果中的所有四列。 Spark 再次计算出这一点,并在该侧读取所有四列。此行:+- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
name
列。所以在加入之前,它删除了名称为 null
的元组。 .此行:+- *Filter isnotnull(name#99)
. 这意味着 Spark 已经在为您完成所有这些繁重的工作,以便读取最少的数据并将其带入内存(从而减少 shuffle 和计算时间)。
但是,对于您的特定情况,您可能想考虑是否可以进一步减少此数据读取 - 至少对于连接的一侧。如果 df2 中有许多行具有与 df1 匹配的相同键组合,该怎么办?首先在 df2 上做一个不同的你不会更好吗? IE。就像是:
df1.join(
df2.select("df2.name").distinct,
$"df2.name"===$"df1.name"
).select("df1.id","df1.name", "df1.age", "df1.city" )
关于apache-spark - Spark-SQL 中的连接性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39615205/