scala - Spark : PageRank example when iteration too large throws stackoverflowError

标签 scala iteration stack-overflow apache-spark

我测试了 spark 默认的 PageRank 示例并将迭代设置为 1024,然后它会抛出 stackoverflowerror。我在其他程序中也遇到了这个问题。我该如何解决。

object SparkPageRank {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
      System.exit(1)
    }
    var iters = args(2).toInt
    val ctx = new SparkContext(args(0), "PageRank",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
    val lines = ctx.textFile(args(1), 1)
    val links = lines.map{ s => val parts = s.split("\\s+")
    (parts(0), parts(1))
    }.distinct().groupByKey().cache()
    var ranks = links.mapValues(v => 1.0)

    for (i <- 1 to iters) {
        val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
        val size = urls.size
        urls.map(url => (url, rank / size))
      }
    ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
    }

    val output = ranks.collect()
    output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))

    System.exit(0)
  }
}

我在这里发布错误。
    [spark-akka.actor.default-dispatcher-15] ERROR LocalActorRefProvider(akka://spark) - guardian failed, shutting down system
java.lang.StackOverflowError
    at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
    at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
    at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
    at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
    at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:312)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
    at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:326)

最佳答案

这是因为 for 循环中的这些转换会在您的 rdd 中产生非常长的依赖关系。当您尝试运行 spark 作业时,对 rdd 的递归访问会导致 stackoverflow 错误。

要解决这个问题,可以使用checkpoint()在你的rdd上。 cache()不会帮助您立即评估您的 rdd。

所以你应该调用cache()checkpoint()在某些迭代后在您的中间 rdd 上并手动评估它以清除其依赖关系。

关于scala - Spark : PageRank example when iteration too large throws stackoverflowError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22321202/

相关文章:

performance - 如何在不触发 Out of Local Stack 异常的情况下计算两个大字符串的每个字符的巧合?

.net - IIS应用程序池随机崩溃是由堆栈溢出引起的

java - Bubble Shooter 游戏中的 Flood-Fill 算法

scala - 如何遍历 Spark 中的模式?

scala - scala : Mockito 中的模拟案例类

iteration - Dafny 对带中断的循环了解多少?

javascript - 通过数组迭代设置对象键值对

scala - 如何更有效地从 spark 重命名 hdfs 中的文件?

Scala.Either getOrElse 方法

Java substring.equals 与 ==