apache-spark - 检查 Spark 流作业是否挂起的最佳方法

标签 apache-spark apache-spark-sql bigdata spark-streaming

我有 Spark 流应用程序,它基本上从 Kafka 获取触发消息,该消息启动可能需要长达 2 小时的批处理。

发生过一些作业无限期挂起并且没有在通常的时间内完成的事件,目前我们无法在不手动检查 Spark UI 的情况下确定作业的状态。我想有一种方法可以让当前正在运行的 spark 作业挂起与否。所以基本上如果它挂了超过 30 分钟,我想通知用户,以便他们可以采取行动。我有哪些选择?

我发现我可以使用来自驱动程序和执行程序的指标。如果我要选择最重要的一个,那就是最后收到的批次记录。当StreamingMetrics.streaming.lastReceivedBatch_records == 0这可能意味着 Spark 流作业已停止或失败。

但在我的场景中,我只会收到 1 个流触发事件,然后它会启动可能需要长达 2 小时的处理,因此我将无法依赖收到的记录。

有没有更好的办法? TIA

最佳答案

也许是一个简单的解决方案。

在处理开始时 - 启动一个等待线程。

val TWO_HOURS = 2 * 60 * 60 * 1000

val t = new Thread(new Runnable {
  override def run(): Unit = {
    try {
      Thread.sleep(TWO_HOURS)
      // send an email that job didn't end
    } catch {
      case _: Exception => _
    }
  }
})

在可以说批处理结束的地方
t.interrupt()

如果处理在 2 小时内完成 - 服务员线程被中断并且不会发送电子邮件。如果处理未完成 - 将发送电子邮件。

关于apache-spark - 检查 Spark 流作业是否挂起的最佳方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52862846/

相关文章:

scala - 来自 DataFrame 的 RowMatrix 包含空值

scala - 如何在 Spark 2.0+ 中编写单元测试?

python - 如何从列表列创建组合的 Pyspark Dataframe

hadoop - 现实世界大数据开源应用示例

Scala 聚合函数与 Spark RDD 聚合函数

scala - toArray 的值不是 org.apache.spark.rdd.RDD[(String, Int)] 的成员

apache-spark - 如何在spark sqlContext中为数据类型为double的列计算中位数

scala - Spark DataFrame orderBy 和 DataFrameWriter sortBy,有区别吗?

java - 使用MapReduce执行组操作

java - 如何在 PostgreSQL 中添加带标点符号的文本 block