caching - 返回 RDD 的 Apache Spark 方法(带尾递归)

标签 caching recursion apache-spark rdd

RDD 有一个谱系,因此在对其执行操作之前不存在;所以,如果我有一个方法可以对 RDD 执行多次转换并返回一个转换后的 RDD,那么我实际返回的是什么?
在操作需要 RDD 之前,我是否不返回任何内容?如果我在方法中缓存了一个 RDD,它是否会在缓存中持久化?我想我知道这个问题的答案:只有在返回的 RDD 上调用操作时才会运行该方法?但我可能是错的。

这个问题的扩展是:
如果我有一个尾递归方法,它将 RDD 作为参数并返回一个 RDD,但我在该方法中缓存了 RDD:

def method(myRDD : RDD) : RDD = {
   ...
   anRDD.cache
   if(true) return someRDD
   method(someRDD) // tailrec
}

然后,当尾递归发生时,它是否会覆盖之前缓存的 RDD anRDD还是两者都坚持?我想两者都会坚持。当我使用的数据集只有 63mb 大时,我将数据溢出到磁盘。我认为这可能与尾递归方法有关。

最佳答案

RDD 谱系构建为链接在一起的 RDD 对象实例图,其中谱系中的每个节点都有对其依赖项的引用。在最简单的链表中,您可以将其视为链表:

hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)

您可以在基本 RDD 构造函数中欣赏这一点:
/** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent))) 

说到点子上:以同样的方式,我们可以递归地构建一个链表,我们也可以构建一个 RDD 谱系。作用于 RDD 的递归函数的结果将是一个定义良好的 RDD。

将需要一个 Action 来安排执行该沿袭,并将具体化由它表示的计算,就像一旦创建一个链表就可以“遍历”它一样。

考虑一下这个(我必须承认,相当委屈)的例子:
def isPrime(n:Int):Boolean = {
    (n == 2) || (!( n % 2 ==0) && !((3 to math.sqrt(n).ceil.toInt) exists (x => n % x == 0)))
}

def recPrimeFilter(rdd:RDD[Int], i:Int):RDD[Int] = 
if (i<=1) rdd else if (isPrime(i)) recPrimeFilter(rdd.filter(x=> x!=i), i-1) else (recPrimeFilter(rdd.map(x=>x+i), i-1))

当应用于 int 的 RDD 时,我们可以观察具有交错过滤器和质数位置结果的映射的谱系:
val rdd = sc.parallelize(1 to 100)
val res = weirdPrimeFilter(rdd,15)
scala> res.toDebugString
res3: String = 
(8) FilteredRDD[54] at filter at <console>:18 []
 |  FilteredRDD[53] at filter at <console>:18 []
 |  MappedRDD[52] at map at <console>:18 []
 |  FilteredRDD[51] at filter at <console>:18 []
 |  MappedRDD[50] at map at <console>:18 []
 |  FilteredRDD[49] at filter at <console>:18 []
 |  MappedRDD[48] at map at <console>:18 []
 |  MappedRDD[47] at map at <console>:18 []
 |  MappedRDD[46] at map at <console>:18 []
 |  FilteredRDD[45] at filter at <console>:18 []
 |  MappedRDD[44] at map at <console>:18 []
 |  FilteredRDD[43] at filter at <console>:18 []
 |  MappedRDD[42] at map at <console>:18 []
 |  MappedRDD[41] at map at <console>:18 []
 |  ParallelCollectionRDD[33] at parallelize at <console>:13 []

“缓存”打破了沿袭,使 RDD 在缓存点第一次经过那里时“记住”它的内容,以便沿袭中更靠后的所有依赖 RDD 可以重用缓存的数据。
在线性RDD谱系的基本情况下,它根本不会产生任何影响,因为每个节点只会被访问一次。

在这种情况下,如果递归 RDD 构建过程创建一个图形或树状结构,其中在许多不同的“叶”节点上调用操作,则缓存可能是有意义的。

关于caching - 返回 RDD 的 Apache Spark 方法(带尾递归),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29565857/

相关文章:

javascript - JavaScript 中的递归归约

apache-spark - 如何使用 spark-shell 添加 hbase-site.xml 配置文件

mysql - Laravel Caching 用户相关的缓存

在 Laravel 5.5 中更新代码时,php artisan serve 被缓存或没有反应。重新启动后,它每次都会再次运行

python - 为什么递归次数9小于递归限制?

apache-spark - 使用 Spark LDA 可视化主题

python - PySpark - sortByKey() 方法以原始顺序从 k,v 对返回值

c++ - 哪个缓存最友好?

java - 可以在 Scala 中使用 Spring 缓存

java - 在 merge 方法中返回多个变量