RDD 有一个谱系,因此在对其执行操作之前不存在;所以,如果我有一个方法可以对 RDD 执行多次转换并返回一个转换后的 RDD,那么我实际返回的是什么?
在操作需要 RDD 之前,我是否不返回任何内容?如果我在方法中缓存了一个 RDD,它是否会在缓存中持久化?我想我知道这个问题的答案:只有在返回的 RDD 上调用操作时才会运行该方法?但我可能是错的。
这个问题的扩展是:
如果我有一个尾递归方法,它将 RDD 作为参数并返回一个 RDD,但我在该方法中缓存了 RDD:
def method(myRDD : RDD) : RDD = {
...
anRDD.cache
if(true) return someRDD
method(someRDD) // tailrec
}
然后,当尾递归发生时,它是否会覆盖之前缓存的 RDD
anRDD
还是两者都坚持?我想两者都会坚持。当我使用的数据集只有 63mb 大时,我将数据溢出到磁盘。我认为这可能与尾递归方法有关。
最佳答案
RDD 谱系构建为链接在一起的 RDD 对象实例图,其中谱系中的每个节点都有对其依赖项的引用。在最简单的链表中,您可以将其视为链表:
hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)
您可以在基本 RDD 构造函数中欣赏这一点:
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
说到点子上:以同样的方式,我们可以递归地构建一个链表,我们也可以构建一个 RDD 谱系。作用于 RDD 的递归函数的结果将是一个定义良好的 RDD。
将需要一个 Action 来安排执行该沿袭,并将具体化由它表示的计算,就像一旦创建一个链表就可以“遍历”它一样。
考虑一下这个(我必须承认,相当委屈)的例子:
def isPrime(n:Int):Boolean = {
(n == 2) || (!( n % 2 ==0) && !((3 to math.sqrt(n).ceil.toInt) exists (x => n % x == 0)))
}
def recPrimeFilter(rdd:RDD[Int], i:Int):RDD[Int] =
if (i<=1) rdd else if (isPrime(i)) recPrimeFilter(rdd.filter(x=> x!=i), i-1) else (recPrimeFilter(rdd.map(x=>x+i), i-1))
当应用于 int 的 RDD 时,我们可以观察具有交错过滤器和质数位置结果的映射的谱系:
val rdd = sc.parallelize(1 to 100)
val res = weirdPrimeFilter(rdd,15)
scala> res.toDebugString
res3: String =
(8) FilteredRDD[54] at filter at <console>:18 []
| FilteredRDD[53] at filter at <console>:18 []
| MappedRDD[52] at map at <console>:18 []
| FilteredRDD[51] at filter at <console>:18 []
| MappedRDD[50] at map at <console>:18 []
| FilteredRDD[49] at filter at <console>:18 []
| MappedRDD[48] at map at <console>:18 []
| MappedRDD[47] at map at <console>:18 []
| MappedRDD[46] at map at <console>:18 []
| FilteredRDD[45] at filter at <console>:18 []
| MappedRDD[44] at map at <console>:18 []
| FilteredRDD[43] at filter at <console>:18 []
| MappedRDD[42] at map at <console>:18 []
| MappedRDD[41] at map at <console>:18 []
| ParallelCollectionRDD[33] at parallelize at <console>:13 []
“缓存”打破了沿袭,使 RDD 在缓存点第一次经过那里时“记住”它的内容,以便沿袭中更靠后的所有依赖 RDD 可以重用缓存的数据。
在线性RDD谱系的基本情况下,它根本不会产生任何影响,因为每个节点只会被访问一次。
在这种情况下,如果递归 RDD 构建过程创建一个图形或树状结构,其中在许多不同的“叶”节点上调用操作,则缓存可能是有意义的。
关于caching - 返回 RDD 的 Apache Spark 方法(带尾递归),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29565857/