scala - Spark的最终任务比前199次需要100倍的时间,如何改进

标签 scala apache-spark hive left-join

使用数据框运行查询时,我看到一些性能问题。我在研究中已经看到,长时间运行的最终任务可能表明数据并未受到最佳干扰,但尚未找到解决此问题的详细过程。

我开始将两个表作为数据帧加载,然后将这些表合并到一个字段中。我试图添加distribution by(repartition)和sort by,以提高性能,但是仍然看到此单个长期运行的最终任务。这是我的代码的简单版本,请注意查询一和查询二实际上并不是那么简单,而是使用UDF计算一些值。

我已经尝试了spark.sql.shuffle的一些不同设置。我已经尝试了100次,但是失败了(说实话,我并没有真正调试它)。我尝试了300、4000和8000。性能随着每次提高而降低。我正在选择一天的数据,其中每个文件是一个小时。

val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")

val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))

val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))

distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")

val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")

由于按userId进行分区似乎并不理想,因此我可以按时间戳进行分区。如果执行此操作,是否应该只执行日期+小时?如果我的唯一组合少于200个,我将有空的执行者吗?

最佳答案

Spark> = 3.0

从3.0开始,Spark提供了用于处理倾斜联接的内置优化-可以使用spark.sql.adaptive.optimizeSkewedJoin.enabled属性启用。

有关详细信息,请参见SPARK-29544

Spark <3.0

您显然对庞大的正确数据偏斜有问题。让我们看看statistics you've provided:

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194]

平均数约为5,标准差超过2000,您将得到长尾

由于重新分区后某些键比其他键要频繁得多,因此某些执行者将比剩下的执行更多的工作。

此外,您的描述表明问题可能出在散列到同一分区的单个或几个键上。

因此,让我们首先确定异常值(伪代码):
val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache

val frequent = counts
  .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
  .alias("frequent")
  .join(df1, Seq("userId"))

其余的:
val infrequent = counts
  .where($"count" <= mean + 2 * sd)
  .alias("infrequent")
  .join(df1, Seq("userId"))

真的可以期待吗?如果不是,请尝试确定上游问题的根源。

如果期望,您可以尝试:
  • 广播较小的表:
    val df2 = sqlContext.sql("Select * from Table2")
    df2.join(broadcast(df1), Seq("userId"), "rightouter")
    
  • 拆分,统一(union)并仅频繁广播:
    df2.join(broadcast(frequent), Seq("userId"), "rightouter")
      .union(df2.join(infrequent, Seq("userId"), "rightouter"))
    
  • 带有一些随机数据的
  • 盐腌userId

  • 但是您不应该:
  • 重新分区所有数据并在本地排序(尽管单独在本地排序不应该成为问题)
  • 对完整数据执行标准哈希联接。
  • 关于scala - Spark的最终任务比前199次需要100倍的时间,如何改进,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38517835/

    相关文章:

    java - 相同的 Scala 和 Java spark 函数产生不同的结果

    python - PySpark 应用程序失败,出现 java.lang.OutOfMemoryError : Java heap space

    scala - 从 Spark DataFrame 中的单个列派生多个列

    hadoop - hive 开始失败

    scala - Spark 可以将数据直接读取到嵌套的案例类中吗?

    Java 的 "forEach"到 Scala

    scala - 编解码器组合器 : Header contains magic number that is used to discriminate types

    python - 聚合后使用 spark 从 hive 表读取和写入

    sql - 配置单元-最早日期1年内选择行

    python - 如何将数据帧中的连接值插入 Pyspark 中的另一个数据帧?