scala - 监控结构化流媒体

标签 scala apache-spark spark-structured-streaming

我设置了一个运行良好的结构化流,但我希望在它运行时对其进行监视。

我已经构建了一个 EventCollector

class EventCollector extends StreamingQueryListener{
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
    println("Start")
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    println(event.queryStatus.prettyJson)
  }

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println("Term")
  }

我已经构建了一个 EventCollector 并将监听器添加到我的 Spark session 中

val listener = new EventCollector()
spark.streams.addListener(listener)

然后我触发查询

val query = inputDF.writeStream
  //.format("console")
  .queryName("Stream")
  .foreach(writer)
  .start()

query.awaitTermination()

但是,onQueryProgress 永远不会被命中。 onQueryStarted 确实如此,但我希望以一定的时间间隔获取查询的进度,以监视查询的执行情况。有人可以帮忙吗?

最佳答案

经过对这个主题的大量研究,这就是我发现的......

OnQueryProgress 在查询之间被命中。我不确定这个功能是否有意为之,但是当我们从文件中传输数据时,OnQueryProgress 不会触发。

我发现的一个解决方案是依赖 foreach writer 接收器并在 process 函数中执行我自己的性能分析。不幸的是,我们无法访问有关正在运行的查询的特定信息。或者,我还没有弄清楚如何做。这是我在沙箱中实现的用于分析性能的方法:

val writer = new ForeachWriter[rawDataRow] {
    def open(partitionId: Long, version: Long):Boolean = {
        //We end up here in between files
        true
    }
    def process(value: rawDataRow) = {
        counter += 1

        if(counter % 1000 == 0) {
            val currentTime = System.nanoTime()
            val elapsedTime = (currentTime - startTime)/1000000000.0

            println(s"Records Written:  $counter")
            println(s"Time Elapsed: $elapsedTime seconds")
        }
     }
}

获取指标的另一种方法:

获取有关正在运行的查询的信息的另一种方法是访问 Spark 为我们提供的 GET 端点。

http://localhost:4040/metrics

http://localhost:4040/api/v1/

此处的文档:http://spark.apache.org/docs/latest/monitoring.html

2017 年 9 月 2 日更新号: 在常规 Spark 流上进行测试,而非结构化流

免责声明,这可能不适用于结构化流,我需要设置一个测试床来确认。但是,它确实可以与常规 Spark 流一起使用(在本例中使用 Kafka)。

我相信,自从 Spark Streaming 2.2 发布以来,已经存在新的端点,可以检索有关流性能的更多指标。这可能已经存在于以前的版本中,我只是错过了它,但我想确保它已被记录下来供其他搜索此信息的人使用。

http://localhost:4040/api/v1/applications/ {applicationIdHere}/流媒体/统计

这个端点看起来像是在 2.2 中添加的(或者它已经存在并且只是添加了文档,我不确定,我还没有检查过)。

无论如何,它都会为指定的流应用程序添加这种格式的指标:

{
  "startTime" : "2017-09-13T14:02:28.883GMT",
  "batchDuration" : 1000,
  "numReceivers" : 0,
  "numActiveReceivers" : 0,
  "numInactiveReceivers" : 0,
  "numTotalCompletedBatches" : 90379,
  "numRetainedCompletedBatches" : 1000,
  "numActiveBatches" : 0,
  "numProcessedRecords" : 39652167,
  "numReceivedRecords" : 39652167,
  "avgInputRate" : 771.722,
  "avgSchedulingDelay" : 2,
  "avgProcessingTime" : 85,
  "avgTotalDelay" : 87
}

这使我们能够使用 Spark 公开的 REST 端点构建自己的自定义指标/监控应用程序。

关于scala - 监控结构化流媒体,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40937627/

相关文章:

postgresql - 仅为单个模式生成流畅的代码

scala - 使用类型成员作为自身类型

hadoop - 当 parquet 使用 Snappy 算法而不是 gzip 时,将 parquet 数据写入 hive 的 spark 作业卡在了最后一个任务中

hadoop - 如何在Spark流中运行并发事件作业以及执行者之间的公平任务调度

scala - 结构化流异常 : Append output mode not supported for streaming aggregations

scala - fs2 流不会在延迟时中断

scala - 将 Shapeless hlist 类型 F[T1]::...::F[Tn]::HNil 映射到类型 T1::...::Tn::HNil(类型级别排序)

python - 使用 UDF 处理多个列时堆栈溢出

apache-spark - 使用 checkpointLocation 偏移量从 Kafka 主题读取流的正确方法

scala - Spark Structured Streaming MemoryStream + Row + Encoders 问题