apache-spark - 增加读取 parquet 文件的并行度 - Spark 优化自连接

标签 apache-spark optimization apache-spark-sql self-join

我想执行自连接以生成候选匹配对。目前,这不起作用,因为此操作太慢了。不幸的是,我无法广播数据帧,因为它们太大了。

首先我聚合元组的数量以减少数据:

val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")

这工作得很好,而且速度很快。然后,我想执行自联接以生成候选人。
我已经观察到我需要生成更多的并行性:
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \ 

因此,设置了增加的默认值和随机播放并行度。此外,我尝试粗化两个离散值(即增加落入离散块的项目数),从而减少元组的数量。仍然没有运气。所以我还尝试通过重新分区来强制执行更多的任务:
val materializedAggregated= spark.read.parquet(s"path/to/file/aggregated_stuff"      )
  .repartition(4000)
val selfB = materializedAggregated
  .withColumnRenamed("baz", "other_batz")
  .withColumnRenamed("value", "other_value")

val candidates = materializedMultiSTW
  .join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
  .filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))

然而,这也不起作用,而且速度太慢。我还能做些什么来使这个查询计算得更快?有什么我想念的吗?

下面您将看到在读取自联接数据时尝试增加并行度的各种失败尝试。

我什至设置:
--conf spark.sql.files.maxPartitionBytes=16777216 \

到 1/8,即 16 对 128MB,生成的任务数量仍然太少,即只有 250。

一些细节

执行计划:

enter image description here

即使没有这个手动重新分区,它也太慢了,我担心没有创建足够的分区:

enter image description here

处理的任务更少——这很可能会使它变慢:

enter image description here

如何确保此初始步骤具有更高的并行度?
分桶有帮助吗?但是当只读取一次混洗数据时 - 它不会真正产生加速 - 对吧?
编写聚合文件时的重新分区步骤如何?我应该在这里设置更高的数字吗?
到目前为止,即使省略它(并且基本上重新计算聚合两次) - 它也不会增加超过 260 个任务。

环境

我在 HDP 3.1 上使用 spark 2.3.x

最佳答案

无论 spark.sql.shuffle.partitions 的设置如何,内部联接的最大任务数将等于联接键的数量(即它们的基数)。和 spark.default.parallelism .

这是因为在 SortMergeJoin 中,数据将使用连接键的哈希进行打乱。来自每个不同连接键的所有数据都将转到单个执行程序。

因此,问题是您没有足够的垃圾箱 - 它们太粗糙了。您将看到的最大任务数将等于 bin 的数量。

如果您更细化数据,您应该会看到任务数量增加。

关于apache-spark - 增加读取 parquet 文件的并行度 - Spark 优化自连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61073551/

相关文章:

postgresql - 你如何让驱动程序和执行程序通过 spark-submit 加载和识别 EMR 中的 postgres 驱动程序?

java - 值比较和值分配之间是否存在任何性能差异?

c++ - 英特尔 C++ 编译器了解执行了哪些优化

apache-spark - pyspark 预期构建 ClassDict 的参数为零(对于 pyspark.mllib.linalg.DenseVector)

apache-spark - 使用 Apache Spark 将数据持久化到 DynamoDB

python - 结合多个带有不同列的pyspark数据框

scala - Snakeyaml 和 Spark 导致无法构造对象

scala - Spark 2 选项数据集

sql - 查询内联与扁平化

scala - 如何使用带管道的多字符分隔符进行拆分?