scala - 在spark中为LDA准备数据

标签 scala apache-spark apache-spark-mllib lda

我正在致力于实现 Spark LDA 模型(通过 Scala API),但在对数据进行必要的格式化步骤时遇到了问题。我的原始数据(存储在文本文件中)采用以下格式,本质上是 token 及其对应的文档的列表。一个简化的例子:

doc XXXXX   term    XXXXX
1   x       'a'     x
1   x       'a'     x
1   x       'b'     x
2   x       'b'     x
2   x       'd'     x
...

其中 XXXXX 列是我不关心的垃圾数据。我意识到这是一种非典型的存储语料库数据的方式,但这就是我所拥有的。我希望从示例中可以清楚地看出,原始数据中每个标记有一行(因此,如果给定术语在文档中出现 5 次,则对应于 5 行文本)。

无论如何,我需要将此数据格式化为稀疏词频向量以运行 Spark LDA 模型,但我不熟悉 Scala,因此遇到了一些麻烦。

我从以下开始:

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

val corpus:RDD[Array[String]] = sc.textFile("path/to/data")
    .map(_.split('\t')).map(x => Array(x(0),x(2)))

然后我获得生成稀疏向量所需的词汇数据:

val vocab: RDD[String] = corpus.map(_(1)).distinct()
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap

我不知道在这里使用正确的映射函数,这样我最终会得到每个文档的稀疏术语频率向量,然后我可以将其输入 LDA 模型。我想我需要一些类似的东西......

val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex
    .map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))

此时我可以运行实际的 LDA:

val lda = new LDA().setK(n_topics)
val ldaModel = lda.run(documents)

基本上,我不知道对每个组应用什么函数,以便我可以将术语频率数据(大概作为映射?)输入到稀疏向量中。也就是说,我该如何填写上面代码片段中的???才能达到想要的效果呢?

最佳答案

处理这个问题的一种方法:

  • 确保 spark-csv 软件包可用
  • 将数据加载到 DataFrame 中并选择感兴趣的列

    val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .option("inferSchema", "true") // Optional, providing schema is prefered
        .option("delimiter", "\t")
        .load("foo.csv")
        .select($"doc".cast("long").alias("doc"), $"term")
    
  • 索引术语列:

    import org.apache.spark.ml.feature.StringIndexer
    
    val indexer = new StringIndexer()
      .setInputCol("term")
      .setOutputCol("termIndexed")
    
    val indexed = indexer.fit(df)
      .transform(df)
      .drop("term")
      .withColumn("termIndexed", $"termIndexed".cast("integer"))
      .groupBy($"doc", $"termIndexed")
      .agg(count(lit(1)).alias("cnt").cast("double"))
    
  • 转换为PairwiseRDD

    import org.apache.spark.sql.Row
    
    val pairs = indexed.map{case Row(doc: Long, term: Int, cnt: Double) => 
      (doc, (term, cnt))}
    
  • 按文档分组:

    val docs = pairs.groupByKey
    
  • 创建特征向量

    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.sql.functions.max
    
    val n = indexed.select(max($"termIndexed")).first.getInt(0) + 1
    
    val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))
    
  • 现在您已拥有创建LabeledPoints或应用其他处理所需的一切

关于scala - 在spark中为LDA准备数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33551407/

相关文章:

eclipse - 为什么 Scala `var` 在 Eclipse 中以红色突出显示

scala - groupByKey 与aggregateByKey - 差异究竟来自哪里?

python - Pyspark - 如何将 '4 hours' 多个窗口分组聚合

python - PySpark DataFrame 无法删除重复项

apache-spark - VectorUDT 用法

云计算学习的Python资源?

java - Scala - 非法的 base64 字符 5c

java - 从磁盘读取 Spark 流错误 - java.io.NotSerializedException : org. apache.spark.streaming.api.java.JavaStreamingContext

python - 高斯混合模型 : Difference between Spark MLlib and scikit-learn

apache-spark - PySpark 特征选择和可解释性