scala - 如何为 ML 算法矢量化 DataFrame 列?

标签 scala apache-spark apache-spark-mllib apache-spark-ml

有一个带有一些分类字符串值的 DataFrame(例如 uuid|url|browser)。

我会将它转换为 double 以执行接受双矩阵的 ML 算法。

作为转换方法,我使用 StringIndexer (spark 1.4) 将我的字符串值映射到 double 值,所以我定义了一个这样的函数:

def str(arg: String, df:DataFrame) : DataFrame =
   (
    val indexer = new StringIndexer().setInputCol(arg).setOutputCol(arg+"_index")
    val newDF = indexer.fit(df).transform(df)
    return newDF
   )

现在的问题是,我将迭代 df 的每个列,调用此函数并在已解析的双列中添加(或转换)原始字符串列,因此结果将是:

初始 df:
[String: uuid|String: url| String: browser]

最终 df:
[String: uuid|Double: uuid_index|String: url|Double: url_index|String: browser|Double: Browser_index]

提前致谢

最佳答案

您可以简单地foldLeftArray列数:

val transformed: DataFrame = df.columns.foldLeft(df)((df, arg) => str(arg, df))

不过,我会争辩说这不是一个好方法。自 src丢弃 StringIndexerModel获取新数据时无法使用。因此,我建议使用 Pipeline :
import org.apache.spark.ml.Pipeline

val transformers: Array[org.apache.spark.ml.PipelineStage] = df.columns.map(
   cname => new StringIndexer()
     .setInputCol(cname)
     .setOutputCol(s"${cname}_index")
)

// Add the rest of your pipeline like VectorAssembler and algorithm
val stages: Array[org.apache.spark.ml.PipelineStage] = transformers ++ ???

val pipeline = new Pipeline().setStages(stages)
val model = pipeline.fit(df)
model.transform(df)
VectorAssembler可以像这样包含:
val assembler  = new VectorAssembler()
    .setInputCols(df.columns.map(cname => s"${cname}_index"))
    .setOutputCol("features")

val stages = transformers :+ assembler

您也可以使用 RFormula ,这是不太可定制的,但更简洁:
import org.apache.spark.ml.feature.RFormula

val rf = new RFormula().setFormula(" ~ uuid + url + browser - 1")
val rfModel = rf.fit(dataset)
rfModel.transform(dataset)

关于scala - 如何为 ML 算法矢量化 DataFrame 列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32357226/

相关文章:

hadoop - Spark on yarn 概念理解

azure - Apache Spark 的 .Net UDF 必须可从 Azure Databricks Notebook 调用

scala - 创建一个 RDD 来收集迭代计算的结果

scala - 如何检查 Scala HighKinded TypeTag 是否为数组?

scala - 将本地向量转换为 RDD[向量]

scala - 使用 Scala 系统进程

java - 使用 DataFrameReader 从 S3 读取文件

linux - sc 未在 SparkContext 中定义

java - 如何使用 Java 在 Spark 中组合或合并两个稀疏 vector ?

scala - 将字节数组转换为字符串并再次转换回来