使用数据框运行查询时,我看到一些性能问题。我在研究中已经看到,长时间运行的最终任务可能表明数据并未受到最佳干扰,但尚未找到解决此问题的详细过程。
我开始将两个表作为数据帧加载,然后将这些表合并到一个字段中。我试图添加distribution by(repartition)和sort by,以提高性能,但是仍然看到此单个长期运行的最终任务。这是我的代码的简单版本,请注意查询一和查询二实际上并不是那么简单,而是使用UDF计算一些值。
我已经尝试了spark.sql.shuffle
的一些不同设置。我已经尝试了100次,但是失败了(说实话,我并没有真正调试它)。我尝试了300、4000和8000。性能随着每次提高而降低。我正在选择一天的数据,其中每个文件是一个小时。
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
.repartition(df1("userId"))
.sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
.repartition(df2("userId"))
.sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
.sql("""
Select
df1.*
from
df1
left outer join df2 on
df1.userId = df2.userId""")
由于按userId进行分区似乎并不理想,因此我可以按时间戳进行分区。如果执行此操作,是否应该只执行日期+小时?如果我的唯一组合少于200个,我将有空的执行者吗?
最佳答案
Spark> = 3.0
从3.0开始,Spark提供了用于处理倾斜联接的内置优化-可以使用spark.sql.adaptive.optimizeSkewedJoin.enabled
属性启用。
有关详细信息,请参见SPARK-29544。
Spark <3.0
您显然对庞大的正确数据偏斜有问题。让我们看看statistics you've provided:
df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088]
df2 = [mean=1.0, stddev=0.0, count=18408194]
平均数约为5,标准差超过2000,您将得到长尾。
由于重新分区后某些键比其他键要频繁得多,因此某些执行者将比剩下的执行更多的工作。
此外,您的描述表明问题可能出在散列到同一分区的单个或几个键上。
因此,让我们首先确定异常值(伪代码):
val mean = 4.989209978967438
val sd = 2255.654165352454
val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache
val frequent = counts
.where($"count" > mean + 2 * sd) // Adjust threshold based on actual dist.
.alias("frequent")
.join(df1, Seq("userId"))
其余的:
val infrequent = counts
.where($"count" <= mean + 2 * sd)
.alias("infrequent")
.join(df1, Seq("userId"))
真的可以期待吗?如果不是,请尝试确定上游问题的根源。
如果期望,您可以尝试:
val df2 = sqlContext.sql("Select * from Table2")
df2.join(broadcast(df1), Seq("userId"), "rightouter")
union
)并仅频繁广播:df2.join(broadcast(frequent), Seq("userId"), "rightouter")
.union(df2.join(infrequent, Seq("userId"), "rightouter"))
userId
但是您不应该:
关于scala - Spark的最终任务比前199次需要100倍的时间,如何改进,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38517835/