java - ERROR 清理广播异常时出错

标签 java apache-spark spark-streaming stateful

运行我的 Spark 流应用程序时出现以下错误,我们有一个大型应用程序运行多个有状态(使用 mapWithState)和无状态操作。由于 Spark 本身挂起,并且我们看到的唯一错误是在 Spark 日志中,而不是应用程序日志本身,因此隔离错误变得越来越困难。

该错误仅在大约 4-5 分钟后发生,微批处理间隔为 10 秒。 我在 ubuntu 服务器上使用 Spark 1.6.1,并具有基于 Kafka 的输入和输出流。

请注意,我不可能提供尽可能最小的代码来重新创建此错误,因为它不会出现在单元测试用例中,而且应用程序本身非常大

您为解决此问题提供的任何指示都会有所帮助。如果我可以提供更多信息,请告诉我。

下面内联错误:

[2017-07-11 16:15:15,338] ERROR Error cleaning broadcast 2211 (org.apache.spark.ContextCleaner)

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout

        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)

        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)

        at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)

        at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)

        at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)

        at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:77)

        at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233)

        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189)

        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180)

        at scala.Option.foreach(Option.scala:236)

        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180)

        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)

        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)

        at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68)

    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:107)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

最佳答案

您的异常消息清楚地表明其 RPCTimeout 由于默认配置为 120 秒,并根据您的工作负载调整为最佳值。 请参阅1.6 configuration

您的错误消息org.apache.spark.rpc.RpcTimeoutException:Futures在[120秒]后超时。 和 org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 证实了这一点。

<小时/>

为了更好地理解,请参阅以下代码

参见RpcTimeout.scala

     /**
   * Wait for the completed result and return it. If the result is not available within this
   * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
   * @param  awaitable  the `Awaitable` to be awaited
   * @throws RpcTimeoutException if after waiting for the specified time `awaitable`
   *         is still not ready
   */
  def awaitResult[T](awaitable: Awaitable[T]): T = {
    try {
      Await.result(awaitable, duration)
    } catch addMessageIfTimeout
  }
}

关于java - ERROR 清理广播异常时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45171175/

相关文章:

java - 如何使用java替换无效字符

java - 如何使用数据库中的属性重新加载配置bean

scala - SparkSQL 函数需要类型为 Decimal

scala - Spark scala中运行时间戳差异

apache-spark - 使用 Spark Streaming 时限制 Kafka 批量大小

Java 从现有 ZipFile 中删除条目

Java 11 没有字体?

mysql - Spark 1.4.1 : Issue when reading MySQL BigInt columns

java - 普罗米修斯抓取指标后如何重置指标状态?

apache-spark - 如何根据数据大小重新分区rdd