scala - 无法使用 case 类从 Row 的 RDD 创建数据框

标签 scala apache-spark apache-spark-sql

使用 Spark 2.x,似乎我无法使用由 case 类组成的 Row 的 RDD 创建数据帧。

它在 Spark 1.6.x 上运行良好,但在 2.x 上失败,出现以下运行时异常:

java.lang.RuntimeException: Timestamp is not a valid external type for schema of struct<seconds:bigint,nanos:int>

前面是一堆来自 Catalyst 的生成代码。

这是片段(我正在做的简化版本):
package main

import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}

object Test {

  case class Timestamp(seconds: Long, nanos: Int)

  val TIMESTAMP_TYPE = StructType(List(
    StructField("seconds", LongType, false),
    StructField("nanos", IntegerType, false)
  ))

  val SCHEMA = StructType(List(
    StructField("created_at", TIMESTAMP_TYPE, true)
  ))

  def main(args: Array[String]) {

    val spark = SparkSession.builder().getOrCreate()

    val rowRDD = spark.sparkContext.parallelize(Seq((0L, 0))).map {
      case (seconds: Long, nanos: Int) => {
        Row(Timestamp(seconds, nanos))
      }
    }

    spark.createDataFrame(rowRDD, SCHEMA).show(1)
  }
}

我不确定这是 Spark 错误还是我在文档中遗漏的东西(我知道 Spark 2.x 引入了运行时行编码验证,这可能是相关的)

非常感谢帮助

最佳答案

我不确定这是否是一个错误,但混合动态类型 Row ,案例类和显式架构没有多大意义。要么使用 Rows和架构:

import collection.mutable._
import collection.JavaConverters._

spark.createDataFrame(ArrayBuffer(Row(Row(0L, 0))).asJava, SCHEMA)

或案例类:
import spark.implicits._

Seq(Tuple1(Timestamp(0L, 0))).toDF("created_at")

否则你只是在做同样的工作两次。

备注 :

如果你想表达字段可以为空,你可以使用 Options .例如
case class Record(created_at: Option[Timestamp])
case class Timestamp(seconds: Long, nanos: Option[Int])

Seq(Record(Some(Timestamp(0L, Some(0))))).toDF

将生成架构,其中 created_atcreated_at.milliseconds可以 NULL ,但是 created_at.seconds如果 created_at,则必须设置不是 NULL .

关于scala - 无法使用 case 类从 Row 的 RDD 创建数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39280007/

相关文章:

Scala 应用方法调用,因为括号与隐式参数冲突

scala - 在 Spark 中连接 Maptype 值时如何处理空值

scala - 为什么选择带有散列而不是点的 Scala 类型成员?

postgresql - 在 RedShift 表中存储数组的正确方法是什么?

amazon-web-services - 为什么 spark-ec2 失败并显示 ERROR : Could not find any existing cluster?

java - 如何在spark中将List<Objects>的byte[]解码为Dataset<Row>?

Scala Currying 和函数字面量

python - 文件不存在 - Spark 提交

json - 如何在Spark SQL中查询StringType的1个字段具有json值的数据框

scala - 通过 Scala IDE 使用 spark sql