scala - 如何通过apache Spark graphX获取SSSP实际路径?

标签 scala apache-spark spark-graphx

我在 Spark 站点上运行了单源最短路径 (SSSP) 示例,如下所示:

graphx-SSSP pregel example

代码(scala):

object Pregel_SSSP {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Allen Pregel Test", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 5).mapEdges(e => e.attr.toDouble)
graph.edges.foreach(println)
val sourceId: VertexId = 0 // The ultimate source

// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

val sssp = initialGraph.pregel(Double.PositiveInfinity, Int.MaxValue, EdgeDirection.Out)(

  // Vertex Program
  (id, dist, newDist) => math.min(dist, newDist),

  // Send Message
  triplet => {
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  //Merge Message
  (a, b) => math.min(a, b))
println(sssp.vertices.collect.mkString("\n"))
 } 
}

来源ID:0
得到结果:
(0,0.0)
(4,2.0)
(2,1.0)
(3,1.0)
(1,2.0)

但我需要如下实际路径:
=>
0 -> 0,0
0 -> 2,1
0 -> 3,1
0 -> 2 -> 4,2
0 -> 3 -> 1,2

How to get SSSP actual path by spark graphX?
anybody give me some hint?
Thanks for your help!

最佳答案

您必须修改算法以便不仅存储最短路径长度而且还存储实际路径。 因此,不应将 Double 存储为顶点的属性,您应该存储元组:(Double, List[VertexId]) 也许这段代码对您有用。

object Pregel_SSSP {
  def main(args: Array[String]) {
    val sc = new SparkContext("local", "Allen Pregel Test", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
    // A graph with edge attributes containing distances
    val graph: Graph[Int, Double] =
      GraphGenerators.logNormalGraph(sc, numVertices = 5).mapEdges(e => e.attr.toDouble)
    graph.edges.foreach(println)
    val sourceId: VertexId = 0 // The ultimate source

    // Initialize the graph such that all vertices except the root have distance infinity.
    val initialGraph : Graph[(Double, List[VertexId]), Double] = graph.mapVertices((id, _) => if (id == sourceId) (0.0, List[VertexId](sourceId)) else (Double.PositiveInfinity, List[VertexId]()))

    val sssp = initialGraph.pregel((Double.PositiveInfinity, List[VertexId]()), Int.MaxValue, EdgeDirection.Out)(

      // Vertex Program
      (id, dist, newDist) => if (dist._1 < newDist._1) dist else newDist, 

      // Send Message
      triplet => {
        if (triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr ) {
          Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr , triplet.srcAttr._2 :+ triplet.dstId)))
        } else {
          Iterator.empty
        }
      },
      //Merge Message
      (a, b) => if (a._1 < b._1) a else b)
    println(sssp.vertices.collect.mkString("\n"))
  }
}

关于scala - 如何通过apache Spark graphX获取SSSP实际路径?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23700124/

相关文章:

scala - 如何在 scala 中加载加权图?

scala - 如何获取 RDD 的子集?

sql - 从光滑的纯 SQL 查询返回元组列类型

apache-spark - 如何在 spark-graphx 中获得两跳邻居?

python spark 属性错误: 'module' object has no attribute 'getrusage'

apache-spark - 如何在 Spark MLlib 中为 K-means 初始化聚类中心?

scala - 如何将 RDD[(String, Iterable[VertexId])] 转换为 DataFrame?

java - Spark 序列化 Java 库

mysql - 获取最近插入的主键 slick

apache-spark - 将 CSV 文件从 S3 读入 Spark 数据帧是否会如此缓慢?