scala - 倾斜的窗口函数和 Hive 源分区?

标签 scala apache-spark hive pyspark time-series

我通过 Spark 读取的数据是具有以下统计信息的高度倾斜的 Hive 表。

(MIN、25TH、MEDIAN、75TH、MAX) 通过 Spark UI:

1506.0 B/0 232.4 KB/27288 247.3 KB/29025 371.0 KB/42669 269.0 MB/27197137

我认为当我执行一些 Window FuncsPivots 时,它会导致下游作业出现问题。

我尝试探索此参数以限制分区大小,但没有任何变化,分区在读取时仍然倾斜。

spark.conf.set("spark.sql.files.maxPartitionBytes")

此外,当我使用 Hive 表作为源缓存此 DF 时,它需要几分钟时间,甚至很可能由于倾斜而在 Spark UI 中导致一些 GC。

spark.sql.files.maxPartitionBytes 是否适用于 Hive 表或仅适用于文件?

处理这种倾斜的 Hive 源的最佳行动方案是什么?

stage barrier write to parquet 或 Salting 之类的东西是否适合解决这个问题?

我想避免在读取时使用 .repartition(),因为它会为作业的数据过山车添加另一层。

谢谢

============================================= ===

经过进一步研究,Window Function 似乎也在导致数据偏斜,这就是 Spark Job 挂起的地方。

我正在通过双重窗口函数执行一些时间序列填充(向前然后向后填充以估算所有null传感器读数)并且我尝试按照本文尝试使用 salt 方法来均匀分布...但是以下代码会生成所有 null 值,因此 salt 方法是不工作。

不确定为什么在 Window 之后我会得到 skews,因为在通过 .groupBy( ) ... 那么为什么需要 salt

+--------------------+-------+
|          measure   |  count|
+--------------------+-------+
|    v1              |5030265|
|      v2            |5009780|
|     v3             |5030526|
| v4                 |5030504|
...

盐柱 => https://medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18

nSaltBins = 300 # based off number of "measure" values
df_fill = df_fill.withColumn("salt", (F.rand() * nSaltBins).cast("int"))

# FILLS [FORWARD + BACKWARD]
window = Window.partitionBy('measure')\
               .orderBy('measure', 'date')\
               .rowsBetween(Window.unboundedPreceding, 0)

# FORWARD FILLING IMPUTER
ffill_imputer = F.last(df_fill['new_value'], ignorenulls=True)\
.over(window)
fill_measure_DF = df_fill.withColumn('value_impute_temp', ffill_imputer)\
.drop("value", "new_value")

window = Window.partitionBy('measure')\
               .orderBy('measure', 'date')\
               .rowsBetween(0,Window.unboundedFollowing)

# BACKWARD FILLING IMPUTER
bfill_imputer = F.first(df_fill['value_impute_temp'], ignorenulls=True)\
.over(window)
df_fill = df_fill.withColumn('value_impute_final', bfill_imputer)\
.drop("value_impute_temp")

最佳答案

在单个分区足够大以至于无法容纳在单个执行程序的内存中的情况下,加盐可能会有所帮助。即使所有 key 也均匀分布(如您的情况),也可能会发生这种情况。

您必须在用于创建窗口的 partitionBy 子句中包含 salt 列。

window = Window.partitionBy('measure', 'salt')\
               .orderBy('measure', 'date')\
               .rowsBetween(Window.unboundedPreceding, 0)

你必须再次创建另一个窗口,它将对中间结果进行操作

window1 = Window.partitionBy('measure')\
                   .orderBy('measure', 'date')\
                   .rowsBetween(Window.unboundedPreceding, 0)

关于scala - 倾斜的窗口函数和 Hive 源分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56741071/

相关文章:

shell - 将配置单元查询输出写入HDFS文件

scala - 确保 Scala 中的 notnull 类?

scala继承和下限问题

java - Scala 参数传递给 Java (Play Framework)

python - PySpark 应用程序在 Yarn 集群模式下提交错误

hadoop - 使用参数在 Hive 中创建 View

scala - 使用 Squeryl 必须绑定(bind) session 错误

regex - 在 Pyspark 中使用正则表达式函数计算日期

hadoop - 将更新的jar文件复制到Spark上的每个从属节点

hadoop - 如何将HiveQL查询的结果输出到远程服务器目录?