hadoop - 如何在Apache Spark中使用Mahout seq2sparse输出的矢量化文档

标签 hadoop apache-spark mahout

Mahout seq2sparse生成一堆sequenceFiles,如完整描述的here所示。我想使用具有以下格式的矢量化文档:<文本,VectorWritable>(docID,TF-IDF vector ),并从TF-IDF vector 中创建一个JavaRDD<Vector>。有人可以指导我吗?

最佳答案

此信息可以随时通过in the documentation获得。

Preprocessing: For a set of Sequence File Formatted documents in PATH_TO_SEQUENCE_FILES the mahout seq2sparse command performs the TF-IDF transformations (-wt tfidf option) and L2 length normalization (-n 2 option) as follows:

$ mahout seq2sparse 
  -i ${PATH_TO_SEQUENCE_FILES} 
  -o ${PATH_TO_TFIDF_VECTORS} 
  -nv 
  -n 2
  -wt tfidf

Training: The model is then trained using mahout spark-trainnb. The default is to train a Bayes model. The -c option is given to train

a CBayes model:

$ mahout spark-trainnb
  -i ${PATH_TO_TFIDF_VECTORS} 
  -o ${PATH_TO_MODEL}
  -ow 
  -c

Label Assignment/Testing: Classification and testing on a holdout set can then be performed via mahout spark-testnb. Again, the -c

option indicates that the model is CBayes:

$ mahout spark-testnb 
  -i ${PATH_TO_TFIDF_TEST_VECTORS}
  -m ${PATH_TO_MODEL} 
  -ow 
  -c


查看mahout command script,我们看到它实际上是在使用org.apache.mahout.drivers.TrainNBDriver类。我们对TFIDF类型的parts using<Text, VectorWritable> vector 感兴趣:
  /** Read the training set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
  private def readTrainingSet: DrmLike[_]= {
    val inputPath = parser.opts("input").asInstanceOf[String]
    val trainingSet= drm.drmDfsRead(inputPath)
    trainingSet
  }

  override def process(): Unit = {
    start()

    val complementary = parser.opts("trainComplementary").asInstanceOf[Boolean]
    val outputPath = parser.opts("output").asInstanceOf[String]

    val trainingSet = readTrainingSet
    val (labelIndex, aggregatedObservations) = SparkNaiveBayes.extractLabelsAndAggregateObservations(trainingSet)
    val model = NaiveBayes.train(aggregatedObservations, labelIndex)

    model.dfsWrite(outputPath)

    stop()
  }

如果我们仔细观察,就会发现drm.drmDfsRead(inputPath)调用正在转换输入。然后将像这样进行转换(例如SparkEngine bindings)
  /**
   * Load DRM from hdfs (as in Mahout DRM format)
   *
   * @param path
   * @param sc spark context (wanted to make that implicit, doesn't work in current version of
   *           scala with the type bounds, sorry)
   *
   * @return DRM[Any] where Any is automatically translated to value type
   */
  def drmDfsRead (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {

    val drmMetadata = hdfsUtils.readDrmHeader(path)
    val k2vFunc = drmMetadata.keyW2ValFunc

    // Load RDD and convert all Writables to value types right away (due to reuse of writables in
    // Hadoop we must do it right after read operation).
    val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin)

        // Immediately convert keys and value writables into value types.
        .map { case (wKey, wVec) => k2vFunc(wKey) -> wVec.get()}

    // Wrap into a DRM type with correct matrix row key class tag evident.
    drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
  }

关于hadoop - 如何在Apache Spark中使用Mahout seq2sparse输出的矢量化文档,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29153066/

相关文章:

apache-spark - 在 Spark 2.2 之前是否有任何使用的表统计信息?

java - mahout Recommende Evaluator - 测试部分数据

java - Py4J 的开销比 Jython 和 JPype 大

amazon-web-services - 亚马逊 S3 错误代码 : 400 while running mr-job on EMR

hadoop - 在 hdfs 中写入大于 block 大小的文件

java - Apache Spark 上的远程作业 (Java)

python - pyspark.ml 管道 : are custom transformers necessary for basic preprocessing tasks?

ubuntu - Ping 不适用于 azure 上的 ubuntu 虚拟机

hadoop - 本地机器上的 Pig 出错