apache-spark - Spark-SQL 中的连接性能

标签 apache-spark apache-spark-sql

假设我们有一个健康的集群,对于我们的用例

two datasets with 1 Billlion + records



我们需要比较两个数据集并找出

duplicates in the original dataset



我正打算写一个

sql query with join on the columns that are to be checked for duplicates



我想知道会怎样

performance for this query and also the refinements



这可以在加入数据集之前在数据集(数据帧分区)中完成。

请务必提出您的意见。

最佳答案

I wanted to know how will be the performance



与什么相比?至于绝对数字,我认为这显然取决于您的数据和集群。

但是,在 Spark 2.0 中,性能改进非常显着。

and the refinements



Catalyst 优化器非常好(在 2.0 之后更是如此)。在它下面负责大部分优化,如列修剪、谓词下推等。(在 2.0 中还有代码生成,负责生成非常优化的代码,从而实现非常大的性能改进。)

无论您使用 DataFrames/Datasets API 还是 SQL,这些改进都可以全面使用。

作为 Spark 催化剂所做的查询优化类型的示例,假设您有两个数据帧 df1 和 df2 具有相同的架构(根据您的情况),并且您希望将它们连接到某些列上以仅获取交集并输出这些元组。

假设我的数据帧架构如下(调用 df.schema ):
StructType(
StructField(df.id,StringType,true), 
StructField(df.age,StringType,true), 
StructField(df.city,StringType,true), 
StructField(df.name,StringType,true))

也就是说,我们的数据集中有 id、age、city、name 列。

现在考虑到你想做的事情,你会做类似的事情
df1.join(
    df2, 
    $"df2.name"===$"df1.name"
       ).select("df1.id","df1.name", "df1.age", "df1.city" ).show

如果您查看上面的物理计划,您会注意到 Catalyst 优化器在幕后进行了许多优化:
== Physical Plan ==
*Project [df1.id#898, df1.name#904, df1.age#886, df1.city#892]
+- *BroadcastHashJoin [df1.name#904], [df2.name#880], Inner, BuildRight
   :- *Project [age#96 AS df1.age#886, city#97 AS df1.city#892, id#98 AS df1.id#898, name#99 AS df1.name#904]
   :  +- *Filter isnotnull(name#99)
   :     +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *Project [name#99 AS df2.name#880]
         +- *Filter isnotnull(name#99)
            +- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>

`

特别要注意的是,即使两个相同的数据帧被连接起来,它们的读取方式也不同——
  • 谓词下推:从查询中可以看出,Spark 很明显,对于 df2,您只需要 name列(而不是带有 id, age 等的整个记录​​)。如果将此信息下推到我的数据被读取的位置,不是很好吗?这将使我免于阅读我不打算使用的不必要的数据。这正是 Spark 所做的!对于 join 的一侧,Spark 只会读取 name柱子。这一行:+- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>另一边df1但是,我们希望加入后结果中的所有四列。 Spark 再次计算出这一点,并在该侧读取所有四列。此行:+- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
  • 同样在阅读之后和加入之前,Spark 发现您正在加入 name列。所以在加入之前,它删除了名称为 null 的元组。 .此行:+- *Filter isnotnull(name#99) .

  • 这意味着 Spark 已经在为您完成所有这些繁重的工作,以便读取最少的数据并将其带入内存(从而减少 shuffle 和计算时间)。

    但是,对于您的特定情况,您可能想考虑是否可以进一步减少此数据读取 - 至少对于连接的一侧。如果 df2 中有许多行具有与 df1 匹配的相同键组合,该怎么办?首先在 df2 上做一个不同的你不会更好吗? IE。就像是:
    df1.join(
        df2.select("df2.name").distinct, 
        $"df2.name"===$"df1.name"
       ).select("df1.id","df1.name", "df1.age", "df1.city" )
    

    关于apache-spark - Spark-SQL 中的连接性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39615205/

    相关文章:

    scala - Spark : Replace Null value in a Nested column

    scala - Spark 上的应用程序网络?

    dataframe - Spark Scala 中减去两个数据帧中的列以获得差异

    sql - 分组并过滤scala数据框中的最高值

    pyspark - 转换列并更新 DataFrame

    apache-spark - 如何为 PySpark 的窗口函数设置分区?

    hadoop - 使用接收器和 WAL 的 Spark Kafka 集成

    java - 在 Scala 中,如何创建开始日期和结束日期之间的每月日期的日期数组列?

    apache-spark - Spark >2 - 连接操作期间的自定义分区键

    scala - 创建RDD时spark报错RDD类型未找到