scala - 在 Apache Spark 中为具有大量列的数据集创建机器学习管道的最佳方法

标签 scala apache-spark apache-spark-mllib

我正在使用 Spark 2.1.1 处理一个具有约 2000 个特征的数据集,并尝试创建一个基本的 ML Pipeline,其中包含一些 Transformer 和一个 Classifier。

为了简单起见,我们假设我正在使用的管道由 VectorAssembler、StringIndexer 和 Classifier 组成,这将是一个相当常见的用例。

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

如果管道步骤分为变压器管道 (VectorAssembler + StringIndexer) 和第二个分类器管道,并且如果在两个管道之间删除不必要的列,则训练会成功。 这意味着为了重用模型,必须在训练后保存两个 PipelineModel,并且必须引入中间预处理步骤。

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

(恕我直言)更简洁的解决方案是将所有管道阶段合并到一个管道中。

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

但是,将所有 PipelineStages 放入一个 Pipeline 中会导致以下异常,可能是由于问题 this PR最终会解决:

ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF

原因是 VectorAssembler 有效地将 DataFrame 中的数据量加倍(在本例中),因为没有转换器可以删除不必要的列。 (参见spark pipeline vector assembler drop other columns)

该示例适用于 golub dataset并且需要进行以下预处理步骤:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

由于我是 Spark 新手,我不确定解决此问题的最佳方法是什么。你会建议...

  1. 创建一个新的变压器,它会删除列并且可以合并到管道中?
  2. 拆分两个管道并引入中间步骤
  3. 还有什么吗? :)

或者我是否遗漏了任何可以解决此问题的重要内容(管道步骤、PR 等)?

<小时/>

编辑:

我实现了一个新的 Transformer DroppingVectorAssembler,它会删除不必要的列,但是会引发相同的异常。

除此之外,将 spark.sql.codegen.wholeStage 设置为 false 并不能解决问题。

最佳答案

janino 错误是由于优化器过程中创建的常量变量的数量所致。 JVM 中允许的常量变量的最大限制是 ((2^16) -1)。如果超过此限制,那么您将得到类的常量池...已超过 0xFFFF 的 JVM 限制

将解决此问题的 JIRA 是 SPARK-18016 ,但目前仍在进行中。

您的代码很可能在 VectorAssembler 阶段失败,因为它必须在单个优化任务期间对数千列执行操作。

我针对此问题开发的解决方法是通过处理列的子集来创建“向量的向量”,然后最后将结果汇总在一起以创建奇异特征向量。这可以防止任何单个优化任务超出 JVM 常量限制。它并不优雅,但我已经在达到 10k 列范围的数据集上使用了它。

此方法还允许您仍然保留单个管道,尽管它需要一些额外的步骤才能使其工作(创建子向量)。从子向量创建特征向量后,您可以根据需要删除原始源列。

示例代码:

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(注意:创建列列表的方法实际上应该以编程方式完成,但为了理解这个概念,我使这个示例保持简单。)

关于scala - 在 Apache Spark 中为具有大量列的数据集创建机器学习管道的最佳方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43911694/

相关文章:

scala - 用于机器学习的 Apache Spark mllib.linalg 向量和 Spark.util 向量之间的差异

Scala:计算标准差的通用方法是什么

dataframe - Spark Scala Cassandra 连接器删除所有所有行失败,IllegalArgumentException 要求失败异常

scala - 从数组到列表的隐式转换

scala工作表用spark抛出错误

java - 如何在spark(java)中迭代数据集的所有列

python - 我们可以在pyspark的ParamGridBuilder中使用for循环吗?

apache-spark - 在 Apache Spark Python 中自定义 K-means 的距离公式

scala - 用 `type` 参数声明一个函数 `implicit`

parsing - Scala StandardTokenParsers 与 JavaTokenParsers