我的 Spark 程序有多个表连接(使用 SPARKSQL),我想收集处理每个连接所需的时间并将其保存到统计表中。目的是在一段时间内连续运行它并以非常细粒度的级别收集性能。
例如
val DF1=spark.sql("从A,B中选择x,y")
Val DF2 =spark.sql("从表1、表2中选择k、v")
最后,我加入 DF1 和 DF2,然后启动类似 saveAsTable 的操作。
我正在寻找的是弄清楚
1.计算 DF1 实际花费了多少时间
2.计算DF2和需要多少时间
3.将最终的连接持久化到 Hive/HDFS 需要多长时间
并将所有这些信息放入 RUN-STATISTICS 表/文件中。
感谢任何帮助,并提前致谢
最佳答案
Spark 使用延迟评估,允许引擎在非常精细的级别上优化 RDD 转换。
当你执行时
val DF1= spark.sql("select x,y from A,B ")
除了将变换添加到有向无环图之外,什么也没有发生。
只有当您执行某个操作(例如 DF1.count)时,驱动程序才会强制执行物理执行计划。这会尽可能地推迟到 RDD 转换链的下游。
因此询问是不正确的
1.How much time it really took to compute DF1
2.How much time to compute DF2 and
至少基于您提供的代码示例。您的代码没有“计算”val DF1。我们可能不知道 DF1 的处理花了多长时间,除非您以某种方式欺骗编译器单独处理每个数据帧。
构建问题的更好方法可能是“我的工作总体分为多少个阶段(任务),以及完成这些阶段(任务)需要多长时间”?
通过查看日志文件/Web GUI 时间线可以轻松回答这个问题(根据您的设置有不同的风格)
3.How much time to persist those final Joins to Hive / HDFS
公平的问题。查看 Ganglia
Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.
另一个技巧是我喜欢使用它定义必须以单独函数内的操作结束的每个转换序列,然后在“计时器函数” block 内的输入 RDD 上调用该函数。
例如,我的“计时器”就是这样定义的
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0)/1e9 + "s")
result
}
并且可以用作
val df1 = Seq((1,"a"),(2,"b")).toDF("id","letter")
scala> time{df1.count}
Elapsed time: 1.306778691s
res1: Long = 2
但是,不要仅仅为了将 DAG 分解为更多阶段/广泛的依赖关系而调用不必要的操作。这可能会导致困惑或减慢执行速度。
资源:
https://spark.apache.org/docs/latest/monitoring.html
关于apache-spark - 收集 Spark 作业运行统计信息并将其保存到数据库的最佳方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50050804/