scala - 如何在多列上使用 spark quantilediscretizer

标签 scala dictionary apache-spark pipeline quantile

全部,

我有一个如下的 ml 管道设置

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)

当我运行它时,spark 似乎将每个离散化器设置为一个单独的工作。有没有办法在有或没有管道的情况下将所有离散化器作为单个作业运行?
感谢帮助,感激不尽。

最佳答案

Spark 2.3.0 中添加了对此功能的支持。 See release docs

  • 多个特征转换器的多列支持:
  • [SPARK-13​​030]:OneHotEncoderEstimator (Scala/Java/Python)
  • [SPARK-22397]:QuantileDiscretizer (Scala/Java)
  • [SPARK-20542]: Bucketizer (Scala/Java/Python)

  • 您现在可以使用 setInputColssetOutputCols指定多个列,尽管它似乎尚未反射(reflect)在官方文档中。与一次处理每列一项工作相比,这个新补丁的性能大大提高。

    您的示例可能会进行如下调整:
    import org.apache.spark.ml.feature.QuantileDiscretizer
    import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql._
    import scala.util.Random
    
    val nRows = 10000
    val nCols = 1000
    val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
    val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
    val df = spark.createDataFrame(data, schema)
    df.cache()
    
    //Get continuous feature name and discretize them
    val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
    
    val discretizer = new QuantileDiscretizer()
      .setInputCols(continuous)
      .setOutputCols(continuous.map(c => s"${c}_disc"))
      .setNumBuckets(3)
    
    val pipeline = new Pipeline().setStages(Array(discretizer))
    val model = pipeline.fit(df)
    model.transform(df)
    

    关于scala - 如何在多列上使用 spark quantilediscretizer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43639252/

    相关文章:

    Python字典切换通过其他函数而不仅仅是调用的函数

    scala - 使用Apache Spark流向ElasticSearch实时发送Kafka消息

    apache-spark - 使用 registerTempTable 找不到表或 View

    python - 使用索引器和编码器时出现 PySpark 管道错误

    scala - 拆分 Monix Observable

    scala - 我们真的在 Scala 中拥有 'extend' 特征吗?

    arrays - 如何基于参数对对象列表进行分类

    dictionary - Go:从 map 弹出

    Scala,Casbah - 如何将列表转换为 MongoDBList?

    scala - 将元组添加到集合中不起作用