scala - Apache Spark、范围连接、数据倾斜和性能

标签 scala apache-spark apache-spark-sql

我有以下 Apache Spark SQL 连接谓词:

t1.field1 = t2.field1 and t2.start_date <= t1.event_date and t1.event_date < t2.end_date

数据:

t1 DataFrame have over 50 millions rows
t2 DataFrame have over 2 millions rows

t1 DataFrame 中的几乎所有 t1.field1 字段都具有相同的值(null)。

现在,Spark 集群在单个任务上挂起超过 10 分钟,以便执行此连接并且由于数据倾斜。此时只有一名 worker 和该 worker 的一项任务在工作。所有其他 9 名 worker 都处于闲置状态。如何改进此连接以便将负载从这个特定任务分配到整个 Spark 集群?

最佳答案

我假设您正在进行内部联接。

可以按照以下步骤来优化连接 - 1.加入前我们可以根据最小或最大的start_date、event_date、end_date过滤掉t1和t2。它将减少行数。

  1. 检查 t2 数据集的 field1 是否有空值,如果没有,可以在加入 t1 数据集之前根据 notNull 条件进行过滤。它将减小 t1 大小

  2. 如果您的作业只获得比可用执行器少的执行器,那么您的分区数量就会减少。只需对数据集进行重新分区,设置一个最佳数量,这样你就不会得到大量的分区,反之亦然。

  3. 您可以通过查看任务执行时间来检查分区是否正确发生(没有偏斜),它应该是相似的。

  4. 检查较小的数据集是否适合执行程序内存,可以使用 broadcast_join。

您可能喜欢阅读 - https://github.com/vaquarkhan/Apache-Kafka-poc-and-notes/wiki/Apache-Spark-Join-guidelines-and-Performance-tuning

关于scala - Apache Spark、范围连接、数据倾斜和性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55460923/

相关文章:

scala - Binding.scala 路由重定向到登录页面

scala - 如何用部分数据从json填充案例类?

java - 电梯不喜欢我的 Web.xml

java - 使用 Spark DataFrame 发展模式

apache-spark - zeppelin 中的 Hello World 失败

csv - 如何在spark scala中加载包含多行记录的CSV文件?

scala - Spark和SparkSQL : How to imitate window function?

Scala:函数组合中的类型不匹配,发现 (Int, Int) => Seq[Int] require ? => 序列[整数]

java - 使用 cassandra 连接器在 apache Spark 2.0.2 上运行作业时无法初始化类 com.datastax.spark.connector.types.TypeConverter$

apache-spark - 通过减去字符串格式的两个datetime列来计算持续时间