当我运行如下代码时:
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())
观察 Yarn 中的各个阶段,我注意到 Spark 执行了两次 DAG 计算——一次用于具体化 RDD 并缓存它的重复+计数,然后是第二次创建检查点副本。
既然 RDD 已经具体化并缓存了,为什么检查点不简单地利用这一点,并将缓存的分区保存到磁盘呢?
是否有现有的方法(某种配置设置或代码更改)来强制 Spark 利用这一点并且仅运行一次操作,并且检查点只会复制内容?
我需要“具体化”两次吗?
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())
newRDD.checkpoint
print(newRDD.count())
我创建了一个 Apache Spark Jira 票证以使其成为功能请求: https://issues.apache.org/jira/browse/SPARK-8666
最佳答案
看起来这可能是一个已知问题。请参阅旧的 JIRA 票证,https://issues.apache.org/jira/browse/SPARK-8582
关于caching - 持久化/缓存 RDD 上的 Spark RDD 检查点执行 DAG 两次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31078350/