scala - 匹配向量 Spark Scala 中的 Dataframe 分类变量

标签 scala vector apache-spark dataframe machine-learning

我一直在尝试遵循有关在 Spark scala 中为机器学习 ml 库创建数据帧的堆栈溢出示例。

How to create correct data frame for classification in Spark ML

但是,我无法让匹配的 udf 工作。

Syntax: "kinds of the type arguments (Vector,Int,Int,String,String) do not conform to the expected kinds of the type parameters (type RT,type A1,type A2,type A3,type A4). Vector's type parameters do not match type RT's expected parameters: type Vector has one type parameter, but type RT has none"

我需要创建一个数据框以输入到逻辑回归库中。源示例数据示例有:

Source, Amount, Account, Fraud
CACC1, 9120.50, 999, 0
CACC2, 3897.25, 999, 0
AMXCC1, -523, 999, 0
MASCC2, -8723.15, 999, 0

我想我想要的输出是:

+-------------------+-----+
|           features|label|
+-------------------+-----+
|[1.0,9120.50,999]  |  0.0|
|[1.0,3897.25,999]  |  0.0|
|[2.0,-523.00,999]  |  0.0|
|[0.0,-8723.15,999] |  0.0|
+-------------------+-----+

到目前为止我已经:

val df = sqlContext.sql("select * from prediction_test")
val df_2 = df.select("source","amount","account")

val toVec3 = udf[Vector,String,Int,Int] { (a,b,c) => 
  val e3 = c match {
    case "MASCC2" => 0
    case "CACC1" => 1
    case "AMXCC1" => 2
  }
  Vectors.dense(e1, b, c) 
}

val encodeLabel = udf[Double, Int](_match{case "0" => 0.0 case "1" => 1.0})

val df_3 = df_2.withColumn("features", toVec3(df_2("source"),df_2("amount"),df_2("account")).withColumn("label", encodeLabel(df("fraud"))).select("features","label")

How to create correct data frame for classification in Spark ML

最佳答案

通过使用 Spark 2.3.1,我建议使用以下代码来进行分类就绪的 Spark ML Pipeline。如果您想将分类对象包含到管道中,您只需将其添加到我指出的位置即可。 ClassificationPipeline 返回 PipelineModel。转换此模型后,您可以获得名为特征和标签的分类就绪列。

// Handles categorical features
 def stringIndexerPipeline(inputCol: String): (Pipeline, String) = {
      val indexer = new StringIndexer()
        .setHandleInvalid("skip")
        .setInputCol(inputCol)
        .setOutputCol(inputCol + "_indexed")
      val pipeline = new Pipeline().setStages(Array(indexer))
      (pipeline, inputCol + "_indexed")
    }

// Classification Pipeline Function
def ClassificationPipeline(df:DataFrame): PipelineModel = {

  // Preprocessing categorical features
  val (SourcePipeline, Source_indexed) = stringIndexerPipeline("Source")

  // Use StringIndexer output as input for OneHotEncoderEstimator
  val oneHotEncoder = new OneHotEncoderEstimator()
    //.setDropLast(true)
    //.setHandleInvalid("skip")
    .setInputCols(Array("Source_indexed"))
    .setOutputCols(Array("Source_indexedVec"))


  // Gather features that will be pass through pipeline
  val inputCols = oneHotEncoder.getOutputCols ++ Array("Amount","Account")

  // Put all inputs in a column as a vector
  val vectorAssembler = new VectorAssembler()
    .setInputCols(inputCols)
    .setOutputCol("featureVector")

  // Scale vector column
  val standartScaler = new StandardScaler()
    .setInputCol("featureVector")
    .setOutputCol("features")
    .setWithStd(true)
    .setWithMean(false)

  // Create stringindexer for label col
  val labelIndexer = new StringIndexer().
    setHandleInvalid("skip").
    setInputCol("Fraud").
    setOutputCol("label")

  // create classification object in here 
  // val classificationObject = new ....


  // Create a pipeline
  val pipeline = new Pipeline().setStages(
    Array(SourcePipeline, oneHotEncoder, vectorAssembler, standartScaler, labelIndexer/*, classificationObject*/))
  pipeline.fit(df)



   }

val pipelineModel = ClassificationPipeline(df)

val transformedDF = pipelineModel.transform(df)

关于scala - 匹配向量 Spark Scala 中的 Dataframe 分类变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37815800/

相关文章:

所有组合的列表

scala 作为脚本语言

r - 给定位置和条件,用 R 改变向量中的值

c++ - 如何按索引从 std::vector<> 中删除元素?

c++ - 从 STL 容器中移除一些东西而不解构它

csv - 如何在数据框中指定缺失值

java - 无法将新生成的 Play Framework 项目导入 IntelliJ IDEA 15

Scala - 将函数传递给 Future.apply

json - Spark : Use JsonSchema to parse Json into DataFrame

scala - 水平连接多个数据框