apache-spark - 如何避免每次执行查询时进行查询准备(解析、计划和优化)?

标签 apache-spark apache-spark-sql spark-streaming

在我们的 Spark 流应用程序中,使用 60 秒批处理,我们在 DF 上创建一个临时表,然后针对它运行大约 80 个查询,例如:

sparkSession.sql("select ... from temp_view group by ...")

但考虑到这些查询相当繁重,大约有 300 个求和列,如果我们不必分析 sql 并为每个微批处理生成查询计划,那就太好了。

难道没有办法生成、缓存和重用查询计划吗?即使每次查询只节省 50 毫秒,每批也可以节省大约 4 秒。

我们在 CDH/YARN 上使用 Spark 2.2。谢谢。

最佳答案

我以前没有尝试过,但是“生成、缓存和重用查询计划”你应该简单地(重新)使用查询(它可能不一定是你通常使用的“形状”,但有一个可能适用于您的情况)。

(大声思考)

每个结构化查询(无论是 Dataset、DataFrame 还是 SQL)都会经历几个阶段,即解析、分析、逻辑优化、规划和物理优化。

结构化查询由其​​计划描述,优化的物理查询计划是您可以使用 Dataset.explain 看到的计划。 :

explain(): Unit Prints the physical plan to the console for debugging purposes.


scala> spark.version
res0: String = 2.3.1-SNAPSHOT

scala> :type q
org.apache.spark.sql.DataFrame

scala> q.explain
== Physical Plan ==
*(1) Project [id#0L, (id#0L * 2) AS x2#2L]
+- *(1) Range (0, 4, step=1, splits=8)

您不直接使用计划,但关键是您可以。另一个重要的一点是计划通常 对它们优化的数据集一无所知(我说通常是因为 Spark SQL 有一个基于成本的优化器,它与数据一起工作以提供尽可能优化的查询计划)。

每当您执行操作时,都会通过所谓的 进行查询。结构化查询执行管道 .每次执行一个 Action 时它都会进行“预处理”(即使那是相同的 Action )。这就是您可以缓存结果的原因,但这会将查询与数据永远联系起来(您想避免这种情况)。

话虽如此,我认为您可以在调用操作之前进行优化(并通过查询的“管道”抽取数据)。只需使用您可以使用 QueryExecution.rdd 生成的优化物理查询计划这将为您提供代表结构化查询的 RDD。使用该 RDD,您可以简单地 RDD.[theAction]每个批处理间隔将避免结构化查询经过成为 RDD 的所有阶段。
scala> q.rdd
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[4] at rdd at <console>:26

您甚至可以使用 QueryExecution.toRdd 来“优化”RDD反而。
scala> q.queryExecution.toRdd
res4: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = MapPartitionsRDD[7] at toRdd at <console>:26

但是(再次,大声思考)所有这些重用都是自动发生的,因为这些阶段是 lazy val s 所以只是......不,它不能工作......忽略最后一个“但是”并坚持重用底层 RDD 的想法 :) 它应该工作。

顺便说一句,这几乎就是 Spark Structured Streaming 过去使用微批处理执行每个批处理(间隔)的方式。不过,这在 2.3 中发生了变化。

关于apache-spark - 如何避免每次执行查询时进行查询准备(解析、计划和优化)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49583401/

相关文章:

scala - 更新 Spark DataFrame 中的一些行值

apache-spark - Spark 1.5.2 : NaN while calculating stddev

java - 我怎样才能只编译 Spark Core 和 Spark Streaming(这样我就可以获得 Streaming 的单元测试实用程序)?

scala - 如何将列从十六进制字符串转换为长字符串?

java - Spark fat jar 在 YARN 上运行多个版本

python - 如何使用 spark-submit 和 pyspark 运行 luigi 任务

python - 如何将 PySpark 的 FP-growth 与 RDD 结合使用?

scala - spark数据帧爆炸功能错误

mysql - Spark rdd通过查询mysql进行过滤

java - 停止 Spark 流