在 Spark 核心“示例”目录(我使用的是 Spark 1.2.0)中,有一个名为“SparkPageRank.scala”的示例,
val sparkConf = new SparkConf().setAppName("PageRank")
val iters = if (args.length > 0) args(1).toInt else 10
val ctx = new SparkContext(sparkConf)
val lines = ctx.textFile(args(0), 1)
val links = lines.map{ s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()
ctx.stop()
我意识到在这个例子中,沿袭将在每次迭代后继续扩展。结果,当我监控存放shuffle数据的目录时,shuffle数据存储在每次迭代后不断增加。
我应该如何构建应用程序代码,以便 ContextCleaner 的 doCleanupShuffle 将在一定时间间隔(例如,几次迭代)后被激活,这样我就可以防止不断增加的随机数据存储用于需要多次迭代的计算?
六月
最佳答案
显然,当用于随机播放的对象被 GC 时,随机播放文件的清理就会发生。由于您的代码片段是 Page rank 的一个简单示例,我假设您在非常小的数据集上运行它,因此您的内存永远不会超过堆的大小,并且对象永远不会被 GC。尝试使用更大的文件或手动触发 GC(尽管通常不推荐这样做)。
更多信息:https://github.com/apache/spark/pull/5074/files
顺便说一句,在您的示例中,对数据进行分区而不是每次都进行混洗会更有效。
关于scala - 如何设计 Spark 应用,让 Shuffle 数据在一些迭代后自动清理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32410927/