我有一个非常大的 RDD 正在缓存(它仍然适合内存),但由于它太大,我想尽快取消它。但是,当我调用 unpersist 时,它会导致 RPC 超时错误:
17/11/21 23:25:55 INFO BlockManager: Removing RDD 171
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:135)
at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1793)
at org.apache.spark.rdd.RDD.unpersist(RDD.scala:216)
17/11/21 23:27:55 WARN BlockManagerMaster: Failed to remove RDD 171 - Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
触发此错误的代码如下所示:
val tranformation1 = firstTransformation(inputData).cache
log("Tranformation1 Count: " + transformation1.count)
val transformation2 = secondTransformation(transformation1).cache
transformation1.unpersist()
取消持久化 RDD 应该是一个相对便宜的操作。非持久化 RDD 如何导致 RPC 超时?
最佳答案
稍微更全面的答案,因为它很可能是您遇到的版本相关问题 - 事情发生了变化:
来自 JIRA:
RDD 和 DataFrame .unpersist() 方法,以及 Broadcast
.destroy() 方法,采用可选的“阻塞”参数。默认是
在所有情况下都是“假”除了(Scala)RDD 以及它们的 GraphX 子类。
现在默认为 '假' (非阻塞)在所有这些方法中。
Pyspark 的 RDD 和 Broadcast 类现在也有一个可选的“阻塞”参数,使用
相同的行为。
关于scala - 非持久化 RDD 如何导致 RPC 超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47424715/