scala - Spark DataFrame 不尊重模式并将所有内容视为字符串

标签 scala apache-spark apache-spark-sql apache-spark-mllib scala-collections

我正面临一个我多年来一直无法克服的问题。

  • 我使用的是 Spark 1.4 和 Scala 2.10。我现在无法升级(大型分布式基础架构)
  • 我有一个包含几百列的文件,其中只有 2 个是字符串,其余都是 Long。我想将此数据转换为标签/特征数据框。
  • 我已经能够将其转换为 LibSVM 格式。
  • 我只是无法将其转换为标签/功能格式。

  • 原因是
  • 我无法使用这里显示的 toDF()
    https://spark.apache.org/docs/1.5.1/ml-ensembles.html
    val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
    

    它在 1.4 中不支持
  • 所以我首先将 txtFile 转换成一个 DataFrame ,在那里我使用了这样的东西
    def getColumnDType(columnName:String):StructField = {
    
            if((columnName== "strcol1") || (columnName== "strcol2")) 
                return StructField(columnName, StringType, false)
            else
                return StructField(columnName, LongType, false)
        }
    def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = {
            val sfRDD = sc.textFile(staticfeatures_filepath)//
            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
             // reads a space delimited string from application.properties file
            val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("")
    
            // Generate the schema based on the string of schema
            val schema =
              StructType(
                schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
    
            val data = sfRDD
            .map(line => line.split(","))
            .map(p => Row.fromSeq(p.toSeq))
    
            var df = sqlContext.createDataFrame(data, schema)
    
            //schemaString.split(" ").drop(4)
            //.map(s => df = convertColumn(df, s, "int"))
    
            return df
        }   
    

  • 当我做 df.na.drop() df.printSchema()使用这个返回的数据框,我得到了完美的架构,就像这样

    root
     |-- rand_entry: long (nullable = false)
     |-- strcol1: string (nullable = false)
     |-- label: long (nullable = false)
     |-- strcol2: string (nullable = false)
     |-- f1: long (nullable = false)
     |-- f2: long (nullable = false)
     |-- f3: long (nullable = false)
    and so on till around f300
    

    但是 - 可悲的部分是我尝试用 df 做的任何事情(见下文),我总是收到 ClassCastException(java.lang.String 不能转换为 java.lang.Long)
    val featureColumns = Array("f1","f2",....."f300")
    assertEquals(-99,df.select("f1").head().getLong(0))
    assertEquals(-99,df.first().get(4))
    val transformeddf = new VectorAssembler()
            .setInputCols(featureColumns)
            .setOutputCol("features")
            .transform(df)
    

    所以 - 不好的是 - 即使模式说 Long - df 仍然在内部将所有内容视为字符串。

    编辑

    添加一个简单的例子

    说我有一个这样的文件

    1,A,20,P,-99,1,0,0,8,1,1,1,1,131153
    1,B,23,P,-99,0,1,0,7,1,1,0,1,65543
    1,C,24,P,-99,0,1,0,9,1,1,1,1,262149
    1,D,7,P,-99,0,0,0,8,1,1,1,1,458759
    



    sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11
    

    (列名真的无关紧要,因此您可以忽略此详细信息)

    我想要做的就是创建一个标签/特征类型的数据框,其中我的第 3 列成为标签,第 5 到第 11 列成为特征 [矢量] 列。这样我就可以按照 https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles 中的其余步骤进行操作.

    我也像 zero323 建议的那样转换了列
    val types = Map("strCol1" -> "string", "strCol2" -> "string")
            .withDefault(_ => "bigint")
    df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
    df = df.drop("f0")
    df = df.drop("strCol1")
    df = df.drop("strCol2")
    

    但是断言和 VectorAssembler 仍然失败。
    featureColumns = Array("f2","f3",....."f11")
    这是我拥有 df 后想做的整个序列
        var transformeddf = new VectorAssembler()
        .setInputCols(featureColumns)
        .setOutputCol("features")
        .transform(df)
    
        transformeddf.show(2)
    
        transformeddf = new StringIndexer()
        .setInputCol("f1")
        .setOutputCol("indexedF1")
        .fit(transformeddf)
        .transform(transformeddf)
    
        transformeddf.show(2)
    
        transformeddf = new VectorIndexer()
        .setInputCol("features")
        .setOutputCol("indexedFeatures")
        .setMaxCategories(5)
        .fit(transformeddf)
        .transform(transformeddf)
    

    来自 ScalaIDE 的异常跟踪 - 就在它遇到 VectorAssembler 时如下所示

    java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
        at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
        at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
        at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
        at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
        at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
        at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
        at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
        at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
    

    最佳答案

    你得到 ClassCastException因为这正是应该发生的事情。模式参数不用于自动转换(一些 DataSources 可能会以这种方式使用模式,但不是像 createDataFrame 这样的方法)。它只声明存储在行中的值的类型。您有责任传递与模式匹配的数据,而不是相反。

    虽然 DataFrame显示您声明的模式仅在运行时验证,因此运行时异常。如果您想将数据转换为特定的数据,您有 cast数据明确。

  • 首先将所有列读作 StringType :
    val rows = sc.textFile(staticfeatures_filepath)
      .map(line => Row.fromSeq(line.split(",")))
    
    val schema = StructType(
      schemaString.split(" ").map(
        columnName => StructField(columnName, StringType, false)
      )
    )
    
    val df = sqlContext.createDataFrame(rows, schema)
    
  • 接下来将选定的列转换为所需的类型:
    import org.apache.spark.sql.types.{LongType, StringType}
    
    val types = Map("strcol1" -> StringType, "strcol2" -> StringType)
      .withDefault(_ => LongType)
    
    val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*)
    
  • 使用汇编器:
    val transformeddf = new VectorAssembler()
      .setInputCols(featureColumns)
      .setOutputCol("features")
      .transform(casted)
    

  • 您可以使用 spark-csv 简单地执行第 1 步和第 2 步:
    // As originally 
    val schema = StructType(
      schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
    
    
    val df = sqlContext
      .read.schema(schema)
      .format("com.databricks.spark.csv")
      .option("header", "false")
      .load(staticfeatures_filepath)
    

    另见 Correctly reading the types from file in PySpark

    关于scala - Spark DataFrame 不尊重模式并将所有内容视为字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35990117/

    相关文章:

    json - Play Framework, JSON 重载方法值 [申请]

    json - Scala 保留字作为 JSON 字段名称与 Json.writes[A]( Play 等效于 @SerializedName)

    scala - 在窗口 apache Spark 中将滞后与行计算相结合

    scala - 如何将列从十六进制字符串转换为长字符串?

    hadoop - 如果分配了更多核心,则单个Spark任务会在计算上花费更多时间

    scala - 如何检索 Mongodb 集合中的所有对象,包括 id?

    python - 替换数据框中的重复列

    scala - Spark Scala - 如何迭代数据框中的行,并将计算值添加为数据框的新列

    python - 在 Spark Dataframe (Pyspark) 中提取与特定条件匹配的第一个 "set of rows"

    Scala 宏 : constructing an anonymous class