我通过 Spark 读取的数据是具有以下统计信息的高度倾斜的 Hive 表。
(MIN、25TH、MEDIAN、75TH、MAX) 通过 Spark UI:
1506.0 B/0 232.4 KB/27288 247.3 KB/29025 371.0 KB/42669 269.0 MB/27197137
我认为当我执行一些 Window Funcs
和 Pivots
时,它会导致下游作业出现问题。
我尝试探索此参数以限制分区大小,但没有任何变化,分区在读取时仍然倾斜。
spark.conf.set("spark.sql.files.maxPartitionBytes")
此外,当我使用 Hive 表作为源缓存此 DF 时,它需要几分钟时间,甚至很可能由于倾斜而在 Spark UI 中导致一些 GC。
此 spark.sql.files.maxPartitionBytes
是否适用于 Hive 表或仅适用于文件?
处理这种倾斜的 Hive 源的最佳行动方案是什么?
stage barrier write to parquet 或 Salting 之类的东西是否适合解决这个问题?
我想避免在读取时使用 .repartition()
,因为它会为作业的数据过山车添加另一层。
谢谢
============================================= ===
经过进一步研究,Window Function
似乎也在导致数据偏斜,这就是 Spark Job
挂起的地方。
我正在通过双重窗口函数
执行一些时间序列
填充(向前然后向后填充以估算所有null
传感器读数)并且我尝试按照本文尝试使用 salt
方法来均匀分布...但是以下代码会生成所有 null
值,因此 salt
方法是不工作。
不确定为什么在 Window
之后我会得到 skews
,因为在通过 .groupBy( )
... 那么为什么需要 salt
?
+--------------------+-------+
| measure | count|
+--------------------+-------+
| v1 |5030265|
| v2 |5009780|
| v3 |5030526|
| v4 |5030504|
...
盐柱 => https://medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18
nSaltBins = 300 # based off number of "measure" values
df_fill = df_fill.withColumn("salt", (F.rand() * nSaltBins).cast("int"))
# FILLS [FORWARD + BACKWARD]
window = Window.partitionBy('measure')\
.orderBy('measure', 'date')\
.rowsBetween(Window.unboundedPreceding, 0)
# FORWARD FILLING IMPUTER
ffill_imputer = F.last(df_fill['new_value'], ignorenulls=True)\
.over(window)
fill_measure_DF = df_fill.withColumn('value_impute_temp', ffill_imputer)\
.drop("value", "new_value")
window = Window.partitionBy('measure')\
.orderBy('measure', 'date')\
.rowsBetween(0,Window.unboundedFollowing)
# BACKWARD FILLING IMPUTER
bfill_imputer = F.first(df_fill['value_impute_temp'], ignorenulls=True)\
.over(window)
df_fill = df_fill.withColumn('value_impute_final', bfill_imputer)\
.drop("value_impute_temp")
最佳答案
在单个分区足够大以至于无法容纳在单个执行程序的内存中的情况下,加盐可能会有所帮助。即使所有 key 也均匀分布(如您的情况),也可能会发生这种情况。
您必须在用于创建窗口的 partitionBy 子句中包含 salt 列。
window = Window.partitionBy('measure', 'salt')\
.orderBy('measure', 'date')\
.rowsBetween(Window.unboundedPreceding, 0)
你必须再次创建另一个窗口,它将对中间结果进行操作
window1 = Window.partitionBy('measure')\
.orderBy('measure', 'date')\
.rowsBetween(Window.unboundedPreceding, 0)
关于scala - 倾斜的窗口函数和 Hive 源分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56741071/