scala - 缓存和持久化何时执行(因为它们看起来不像操作)?

标签 scala apache-spark lazy-evaluation

我正在实现一个 Spark 应用程序,下面是一个示例片段(不完全相同的代码):

val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)

在从 Spark Application Master UI 检查此代码的性能时,我看到了 count 操作的条目,但没有看到 persist 的条目。此计数操作的 DAG 还有一个用于“映射”转换的节点(上述代码的第 2 行)。

可以安全地得出这样的结论:映射转换是在遇到 count (在最后一行)时执行的,而不是在遇到 persist 时执行的吗?

另外,rdd2 在什么时候真正被持久化了? 据我所知,在 RDD 上只能调用两种类型的操作 - 转换和操作。如果在调用 count 操作时延迟持久化 RDD,那么持久化会被视为转换或操作还是两者都不是?

最佳答案

数据集的 cachepersist 运算符是惰性的,在您调用操作之前没有任何效果(并等到缓存完成,这是额外的费用稍后会有更好的表现)。

摘自Spark官方文档RDD Persistence (我的粗体句子):

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

这正是某些人(以及 Spark SQL 本身!)采用以下技巧的原因:

rdd2.persist(StorageLevel.MEMORY_AND_DISK).count

触发缓存。

count 运算符相当便宜,因此最终效果是缓存几乎在该行之后立即执行(因为它是异步执行的,所以在缓存完成之前可能有一个小的延迟)。

persist之后进行此count的好处如下:

  1. 没有任何操作(除了计数本身)会“遭受”额外的缓存时间

  2. 该行与使用缓存的 rdd2 的位置之间的时间足以完全完成缓存,因此可以更好地利用时间(无需额外的“减速”)缓存)

所以当你问时:

would persist be considered a transformation or an action or neither?

我想说这两者都不是,并认为它是一个优化提示(可能会或可能不会执行或考虑永远)。

<小时/>

使用 Web UI 的“存储”选项卡查看哪些数据集(作为其底层 RDD)已被保留。

enter image description here

您还可以使用explain(或简单地QueryExecution.optimizedPlan)查看cachepersist运算符的输出.

val q1 = spark.range(10).groupBy('id % 5).agg(count("*") as "count").cache
scala> q1.explain
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [(id % 5)#120L, count#119L]
      +- InMemoryRelation [(id % 5)#120L, count#119L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)])
               +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
                  +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)])
                     +- *(1) Range (0, 10, step=1, splits=16)

scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#5L, count#4L], StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)], output=[(id % 5)#5L, count#4L])
02       +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
03          +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)], output=[(id#0L % 5)#8L, count#10L])
04             +- *(1) Range (0, 10, step=1, splits=16)

请注意,上面的count是一个标准函数,而不是一个操作,并且不会发生缓存。 count 是标准函数和数据集操作的名称,这只是巧合。

您可以使用纯 SQL 缓存表(这太急切了!)

// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
*(1) ColumnarToRow
+- Scan In-memory table `range5` [id#51L]
      +- InMemoryRelation [id#51L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Range (0, 5, step=1, splits=16)

InMemoryTableScan 物理运算符(带有 InMemoryRelation 逻辑计划)是确保查询缓存在内存中并因此被重用的方法。

<小时/>

此外,Spark SQL 本身使用相同的模式来触发 SQL CACHE TABLE query 的 DataFrame 缓存。 (与 RDD 缓存不同,它默认是 eager):

if (!isLazy) {
  // Performs eager caching
  sparkSession.table(tableIdent).count()
}

这意味着就缓存而言,根据运算符的不同,您可能会得到不同的结果。默认情况下,cachepersist 运算符是惰性的,而 SQL 的 CACHE TABLE 是急切的。

关于scala - 缓存和持久化何时执行(因为它们看起来不像操作)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44002128/

相关文章:

javascript - Trampoline、递归和惰性求值

scala - 如何在 spark scala 中使用带有 2 列的 array_contains?

scala - 取消选择已选择的选项

scala - 将时间戳解析为 LocalDateTime Scala

scala - 如何在 Akka HTTP 指令中使用 Future?

apache-spark - Spark 将列组合为嵌套数组

apache-spark - 如何将向量列分成两列?

c++ - 线程安全的延迟获取和释放

java - 在 Clojure 的循环中使用副作用方法改变 Java 对象

斯卡拉^Z3 : Delete previous assertion