scala - 非持久化 RDD 如何导致 RPC 超时?

标签 scala apache-spark

我有一个非常大的 RDD 正在缓存(它仍然适合内存),但由于它太大,我想尽快取消它。但是,当我调用 unpersist 时,它会导致 RPC 超时错误:

17/11/21 23:25:55 INFO BlockManager: Removing RDD 171
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:135)
        at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1793)
        at org.apache.spark.rdd.RDD.unpersist(RDD.scala:216)

17/11/21 23:27:55 WARN BlockManagerMaster: Failed to remove RDD 171 - Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout

触发此错误的代码如下所示:
val tranformation1 = firstTransformation(inputData).cache
log("Tranformation1 Count: " + transformation1.count)
val transformation2 = secondTransformation(transformation1).cache
transformation1.unpersist()

取消持久化 RDD 应该是一个相对便宜的操作。非持久化 RDD 如何导致 RPC 超时?

最佳答案

稍微更全面的答案,因为它很可能是您遇到的版本相关问题 - 事情发生了变化:

  • https://issues.apache.org/jira/browse/SPARK-26771 ?
  • 默认情况下使 .unpersist(), .destroy() 始终保持非阻塞


  • 来自 JIRA:
    RDD 和 DataFrame .unpersist() 方法,以及 Broadcast
    .destroy() 方法,采用可选的“阻塞”参数。默认
    在所有情况下都是“假”除了(Scala)RDD 以及它们的 GraphX 子类。
    现在默认为 '假' (非阻塞)在所有这些方法中。
    Pyspark 的 RDD 和 Broadcast 类现在也有一个可选的“阻塞”参数,使用
    相同的行为。

    关于scala - 非持久化 RDD 如何导致 RPC 超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47424715/

    相关文章:

    scala - Spark:如何将rdd.RDD [String]转换为rdd.RDD [(Array [Byte],Array [Byte])]

    apache-spark - 基于 Direct Stream 的 Spark Streaming 与 Kafka 仅显示一个消费者 ID

    java - SparkSession 初始化抛出 ExceptionInInitializerError

    scala - Play JSON InvariantFunctor

    Scala - 意外的类型从 Map 切换到 Iterable 以便理解?

    scala - 为每一行查找一组列中的第一个非空值和列名

    apache-spark - Spark Structured Streaming 是否可以进行适当的事件时间 session ?

    java - 如何将文档 uri 和数据库名称传递给 marklogic spark 连接器?

    hadoop - Spark RDD 分区与 Hadoop 拆分

    scala - 这是对 Scala 模式匹配的滥用吗?