scala - 将二分图转换为邻接矩阵 Spark scala

标签 scala apache-spark

我正在尝试转换以下格式的边缘列表

data = [('a', 'developer'),
     ('b', 'tester'),
    ('b', 'developer'),
     ('c','developer'),
     ('c', 'architect')]

其中邻接矩阵的形式为

      developer     tester    architect
 a        1            0          0
 b        1            1          0
 c        1            0          1

我想以以下格式存储矩阵

 1    0    0
 1    1    0
 1    0    1

我已经使用 GraphX 尝试过

def pageHash(title:String )  = title.toLowerCase.replace(" ","").hashCode.toLong


val edges: RDD[Edge[String]] = sc.textFile("/user/query.csv").map { line => 
  val row = line.split(",") 
  Edge(pageHash(row(0)), pageHash(row(1)), "1") 
} 
val graph: Graph[Int, String] = Graph.fromEdges(edges, defaultValue = 1)

我能够创建图形,但无法转换为相邻矩阵表示。

最佳答案

一种可能的方法是这样的:

  1. RDD转换为DataFrame

    val rdd = sc.parallelize(Seq(
      ("a", "developer"), ("b", "tester"), ("b", "developer"),
      ("c","developer"), ("c", "architect")))
    
    val df = rdd.toDF("row", "col")
    
  2. 索引列:

    import org.apache.spark.ml.feature.StringIndexer
    
    val indexers = Seq("row", "col").map(x =>
      new StringIndexer().setInputCol(x).setOutputCol(s"${x}_idx").fit(df)
    )
    
  3. 转换数据并创建RDD[MatrixEntry]:

    import org.apache.spark.functions.lit
    import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix}
    
    
    val entries = indexers.foldLeft(df)((df, idx) => idx.transform(df))
      .select($"row_idx", $"col_idx", lit(1.0))
      .as[MatrixEntry]  // Spark 1.6. For < 1.5 map manually
      .rdd
    
  4. 创建矩阵

    new CoordinateMatrix(entries)
    

该矩阵可以进一步转换为任何其他类型的分布式矩阵,包括 RowMatrixIndexedRowMatrix

关于scala - 将二分图转换为邻接矩阵 Spark scala,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35986383/

相关文章:

java - 合并配置库

scala - #::方法是什么

scala - Spark 异常退出

java - Java 中的 Spark Hive Context hql 问题 - 在 yarn 中运行 spark 作业时

scala - 如何使用连接的 RDD

scala - 如何创建注释并在 scala 中获取它们

python - PySpark 2.2 爆炸丢弃空行(如何实现explode_outer)?

scala - 如何定义DataFrame的分区?

scala - 线程 "main"中的异常 java.lang.NoSuchMethodError : scala. Predef$.$scope()Lscala/xml/TopScope$;

apache-spark - Flink 还是 Spark?当流媒体不重要时