scala - 使用 Apache-Spark 进行迭代计算时出现 StackOverflowError

标签 scala apache-spark

如果一个 RDD 对象有非空的 .dependencies,这是否意味着它有沿袭?我怎样才能删除它?

我正在进行迭代计算,每次迭代都取决于上一次迭代的计算结果。多次迭代后,会抛出StackOverflowError

起初我尝试使用缓存,我阅读了pregel.scala中的代码,它是GraphX的一部分,他们使用 count 方法在 cache 之后实现对象,但我附加了一个调试器,似乎这种方法不会清空 .dependencies,而且那也在我的代码中不起作用。

另一种替代方法是使用checkpoint,我为我的Graph 对象尝试了checkpoint 顶点和边,然后通过count 实现它 顶点和边。然后我使用 .isCheckpointed 检查它是否正确检查点,但它总是返回 false。

更新 我编写了一个可以重现该问题的简化版本的代码。

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("HDTM")
      .setMaster("local[4]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator")
    val sc = new SparkContext(conf)

    val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
    val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L)))
    val newGraph = Graph(v, e)
    var currentGraph = newGraph
    val vertexIds = currentGraph.vertices.map(_._1).collect()

    for (i <- 1 to 1000) {
      var g = currentGraph
      vertexIds.toStream.foreach(id => {
        g = Graph(currentGraph.vertices, currentGraph.edges)
        g.cache()
        g.edges.cache()
        g.vertices.cache()
        g.vertices.count()
        g.edges.count()
      })

      currentGraph.unpersistVertices(blocking =  false)
      currentGraph.edges.unpersist(blocking = false)
      currentGraph = g
      println(" iter "+i+" finished")
    }

  }

更新

这是代码,我删除了大部分不必要的方法,因此代码行最少,但是如果您考虑它的功能,它可能没有意义。

object StackOverFlow {
  final val PATH = "./"

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("HDTM")
      .setMaster("local[4]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator")
    val sc = new SparkContext(conf)
    val filePath = PATH + "src/test/resources/binary.txt"
    val wikiGraph: Graph[WikiDataVertex, Double] = WikiGraphLoader.loadGraphFromTestHDTMFile(sc, filePath)
    wikiGraph.cache()
    val root = 0L
    val bfsGraph = GraphAlgorithm.initializeGraph(wikiGraph, root, sc)
    bfsGraph.cache()
    val vertexIds = bfsGraph.vertices.map(_._1).collect()
    var currentGraph = bfsGraph

    for (i <- 1 to 1000) {
      var g = currentGraph
      vertexIds.toStream.foreach(id => {
          g = samplePath(g, id, root)
      })

      currentGraph.unpersistVertices(blocking =  false)
      currentGraph.edges.unpersist(blocking = false)
      currentGraph = g
      println(" iter "+i+" finished")
    }

  }

  def samplePath[ED: ClassTag](graph: Graph[WikiDataVertex, ED],
                               instance: VertexId, root: VertexId): Graph[WikiDataVertex, ED] = {

    if(instance == 0L) return graph

    val (removedGraph, remainedGraph) = splitGraph(graph, instance)

    /**
     * Here I omit some other code, which will change the attributes of removedGraph and remainedGraph
     */

    val newVertices = graph.outerJoinVertices(removedGraph.vertices ++ remainedGraph.vertices)({
      (vid, vd, opt) => {
        opt.getOrElse(vd)
      }
    }).vertices

    val newEdges = graph.edges.map(edge => {
      if (edge.dstId == instance)
        edge.copy(srcId = edge.srcId) 
        // In the real case edge.srcId will be replaced by an vertexId calculated by other functions
      else
        edge.copy()
    })

    val g = Graph(newVertices, newEdges)
    g.vertices.cache()
    g.edges.cache()
    g.cache()
    g.vertices.count()
    g.edges.count()

    remainedGraph.unpersistVertices(blocking = false)
    remainedGraph.edges.unpersist(blocking = false)
    removedGraph.unpersistVertices(blocking = false)
    removedGraph.edges.unpersist(blocking = false)

    g
  }

  /**
   * Split a graph into two sub-graph by an vertex `instance`
   * The edge that ends at `instance` will lose
   * @param graph Graph that will be separated
   * @param instance Vertex that we are using to separate the graph
   * @tparam ED Edge type
   * @return (sub-graph with `instance`, sub-graph without `instance`)
   **/
  def splitGraph[ED: ClassTag]
  (graph: Graph[WikiDataVertex, ED], instance: VertexId): (Graph[WikiDataVertex, ED], Graph[WikiDataVertex,ED]) = {
    val nGraph = GraphAlgorithm.graphWithOutDegree(graph)
    // This will need twice, cache it to prevent re-computation
    nGraph.cache()

    val wGraph = nGraph.subgraph(epred = e => e.dstAttr._1.path.contains(instance) ||
      e.srcAttr._1.path.contains(instance),
      vpred = (id, vd) => vd._1.path.contains(instance))

    val woGraph = nGraph.subgraph(epred = e => !e.dstAttr._1.path.contains(instance) &&
      !e.srcAttr._1.path.contains(instance),
      vpred = (id, vd) => !vd._1.path.contains(instance))

    val removedGraph = Graph(wGraph.vertices.mapValues(_._1), wGraph.edges, null)
    val remainedGraph = Graph(woGraph.vertices.mapValues(_._1), woGraph.edges, null)

    removedGraph.vertices.count()
    removedGraph.edges.count()
    removedGraph.cache()
    remainedGraph.vertices.count()
    remainedGraph.edges.count()
    remainedGraph.cache()

    nGraph.unpersistVertices(blocking = false)
    nGraph.edges.unpersist(blocking = false)

    (removedGraph, remainedGraph)
  }

}

在开始的 10 次迭代中,它运行得很快,之后每次迭代都需要更多的时间。我查看了Spark WebUI,每个操作的实际执行时间几乎是一样的,但是随着迭代次数的增加,Scheduler Delay也会增加。在 20 次左右的迭代之后,它将抛出 StackOverflowError。

最佳答案

val g = loadEdgeFile(sc, edge_pt, n_partition)

g.edges.foreachPartition(_ => Unit)
g.vertices.foreachPartition(_ => Unit)

g.checkpoint()

g.edges.foreachPartition(_ => Unit)
g.vertices.foreachPartition(_ => Unit)
println(s"is cp: ${g.isCheckpointed}"

为了得到一个图检查点,它应该满足三个条件:

  • 该图之前没有具体化;
  • 然后检查它;
  • 你应该具体化顶点和边。 然后你检查图表的状态,你会得到一个真实的答案。

关于scala - 使用 Apache-Spark 进行迭代计算时出现 StackOverflowError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24273441/

相关文章:

java - intellij scala java.lang.classnotfoundException oracle.jdbc.driver.oracledriver

java - Apache Spark Broadcast 变量是 Broadcast 类型?不是RDD?

java - 自定义累加器的不变性是 "must"还是 "should"?

scala - Play Framework 路由不区分大小写

scala - Gradle 任务 :compileScala fails on missing value for zincClasspath

scala - 如何将 Scala 源附加到我的 Maven jar?

scala - 使用喷雾客户端进行多个请求时的 akka 超时

apache-spark - 如果Spark中的数据帧是不可变的,为什么我们可以使用withColumn()之类的操作对其进行修改?

hadoop - 在同一台机器上安装 Spark 和 Hadoop

python - 在 pyspark 中创建一个大字典