apache-spark - 收集 Spark 作业运行统计信息并将其保存到数据库的最佳方法是什么

标签 apache-spark apache-spark-sql

我的 Spark 程序有多个表连接(使用 SPARKSQL),我想收集处理每个连接所需的时间并将其保存到统计表中。目的是在一段时间内连续运行它并以非常细粒度的级别收集性能。

例如

val DF1=spark.sql("从A,B中选择x,y")

Val DF2 =spark.sql("从表1、表2中选择k、v")

最后,我加入 DF1 和 DF2,然后启动类似 saveAsTable 的操作。

我正在寻找的是弄清楚

1.计算 DF1 实际花费了多少时间

2.计算DF2和需要多少时间

3.将最终的连接持久化到 Hive/HDFS 需要多长时间

并将所有这些信息放入 RUN-STATISTICS 表/文件中。

感谢任何帮助,并提前致谢

最佳答案

Spark 使用延迟评估,允许引擎在非常精细的级别上优化 RDD 转换。

当你执行时

val DF1= spark.sql("select x,y from A,B ")

除了将变换添加到有向无环图之外,什么也没有发生。

只有当您执行某个操作(例如 DF1.count)时,驱动程序才会强制执行物理执行计划。这会尽可能地推迟到 RDD 转换链的下游。

因此询问是不正确的

1.How much time it really took to compute DF1

2.How much time to compute DF2 and

至少基于您提供的代码示例。您的代码没有“计算”val DF1。我们可能不知道 DF1 的处理花了多长时间,除非您以某种方式欺骗编译器单独处理每个数据帧。

构建问题的更好方法可能是“我的工作总体分为多少个阶段(任务),以及完成这些阶段(任务)需要多长时间”?

通过查看日志文件/Web GUI 时间线可以轻松回答这个问题(根据您的设置有不同的风格)

3.How much time to persist those final Joins to Hive / HDFS

公平的问题。查看 Ganglia

Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.

另一个技巧是我喜欢使用它定义必须以单独函数内的操作结束的每个转换序列,然后在“计时器函数” block 内的输入 RDD 上调用该函数。

例如,我的“计时器”就是这样定义的

def time[R](block: => R): R = {
  val t0 = System.nanoTime()
  val result = block  
  val t1 = System.nanoTime()
  println("Elapsed time: " + (t1 - t0)/1e9 + "s")
  result
}

并且可以用作

val df1 = Seq((1,"a"),(2,"b")).toDF("id","letter")

scala> time{df1.count}
Elapsed time: 1.306778691s
res1: Long = 2

但是,不要仅仅为了将 DAG 分解为更多阶段/广泛的依赖关系而调用不必要的操作。这可能会导致困惑或减慢执行速度。

资源:

https://spark.apache.org/docs/latest/monitoring.html

http://ganglia.sourceforge.net/

https://www.youtube.com/watch?v=49Hr5xZyTEA

关于apache-spark - 收集 Spark 作业运行统计信息并将其保存到数据库的最佳方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50050804/

相关文章:

scala - Spark DataFrame 过滤 : retain element belonging to a list

python - 将文件名添加到 wholeTextFiles 上的 RDD 行

scala - 如何将 Spark 流 DF 写入 Kafka 主题

apache-spark - pyspark 中的转换 DStream 在调用 pprint 时出错

java.io.FileNotFoundException : localhost/broadcast_1

apache-spark - 在 PySpark 的文字列上检测到 INNER 连接的笛卡尔积

python - PySpark Dataframe Groupby 和计算空值

hadoop - Spark on Yarn 与 Jdk8

apache-spark - 即使枢轴不是操作,Spark 枢轴也会调用作业

apache-spark - 在 pyspark 中查找并删除匹配的列值