scala - (数组/ML Vector/MLlib Vector)RDD 到 ML Vector Dataframe 可以

标签 scala apache-spark apache-spark-sql apache-spark-mllib apache-spark-ml

我需要将 RDD 转换为单列 o.a.s.ml.linalg.Vector DataFrame,以便使用 ML 算法,特别是本例中的 K-Means。这是我的 RDD:

val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.mllib.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble))))

我尝试做什么 this答案表明运气不好,我想因为你最终得到了一个 MLlib Vector,所以在运行算法时会抛出不匹配错误。现在如果我改变这个:

import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}

val schema = new StructType()
  .add("features", new VectorUDT())

对此:

import org.apache.spark.ml.linalg.{Vectors, VectorUDT}

val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.ml.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble))))

val schema = new StructType()
  .add("features", new VectorUDT())

我会收到错误,因为 ML VectorUDT 是私有(private)的。

我还尝试将 RDD 作为 double 组转换为 Dataframe,并获得如下所示的 ML 密集向量:

var parsedData = sc.textFile("/home/pililo/Documents/Mi_Memoria/Codigo/Datasets/Digits/digits480x.csv").map(s => Row(s.split(',').slice(0,64).map(_.toDouble)))

parsedData: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

val schema2 = new StructType().add("features", ArrayType(DoubleType))

schema2: org.apache.spark.sql.types.StructType = StructType(StructField(features,ArrayType(DoubleType,true),true))

val df = spark.createDataFrame(parsedData, schema2)

df: org.apache.spark.sql.DataFrame = [features: array<double>]

val df2 = df.map{ case Row(features: Array[Double]) => Row(org.apache.spark.ml.linalg.Vectors.dense(features)) }

即使导入了 spark.implicits._,也会引发以下错误:

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

非常感谢任何帮助,谢谢!

最佳答案

从我的脑海中浮现出来:

  1. 使用 csv 源和 VectorAssembler:

    import scala.util.Try
    import org.apache.spark.ml.linalg._
    import org.apache.spark.ml.feature.VectorAssembler
    
    val path: String = ???
    
    val n: Int = ???
    val m:Int = ???
    
    val raw = spark.read.csv(path)
    val featureCols = raw.columns.slice(n, m)
    
    val exprs = featureCols.map(c => col(c).cast("double"))
    val assembler = new VectorAssembler()
      .setInputCols(featureCols)
      .setOutputCol("features")
    
    assembler.transform(raw.select(exprs: _*)).select($"features")
    
  2. 使用text源和UDF:

    def parse_(n: Int, m: Int)(s: String) = Try(
      Vectors.dense(s.split(',').slice(n, m).map(_.toDouble))
    ).toOption
    
    def parse(n: Int, m: Int) = udf(parse_(n, m) _)
    
    val raw = spark.read.text(path)
    
    raw.select(parse(n, m)(col(raw.columns.head)).alias("features"))
    
  3. 使用text源并放置换行Row

    spark.read.text(path).as[String].map(parse_(n, m)).toDF
    

关于scala - (数组/ML Vector/MLlib Vector)RDD 到 ML Vector Dataframe 可以,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39298713/

相关文章:

斯卡拉 + 光滑 3 : Inserting the result of one query into another table

scala - 如何从 Spark DataFrame 中选择一个稳定的行子集?

python - 如何使用 Spark 函数 PySpark 将字符串转换为列表

apache-spark - 检查 GraphX 图形对象

mysql - SQL 查询/Spark 数据帧外连接并减去两个表的值

scala - flatten函数中使用的模式匹配中List[_]的解释

java - 如何在spark(java)中合并两个具有不同架构的 Parquet 文件

python - 将包含多种字符串日期格式的列转换为 Spark 中的 DateTime

apache-spark - Spark SQL 中的 Tungsten 编码?

android - dex 在 Android 上为 Scala 构建 Ant 时失败