scala - 如何在Spark 2.X数据集中创建自定义编码器?

标签 scala apache-spark apache-spark-dataset apache-spark-encoders

对于Pojo /基元,Spark数据集从Row移到EncoderCatalyst引擎使用ExpressionEncoder转换SQL表达式中的列。但是,似乎没有Encoder的其他子类可用作我们自己的实现的模板。

这是Spark 1.X / DataFrames中无法在新架构下编译的代码示例:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

我们得到一个编译器错误
Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

因此,在某种程度上/某处应该有一种方法
  • 定义/实现我们的自定义编码器
  • DataFrame(现在是Row类型的数据集)上执行映射时应用它
  • 注册编码器以供其他自定义代码使用

  • 我正在寻找可以成功执行这些步骤的代码。

    最佳答案

    据我所知,自1.6以来并没有真正改变,How to store custom objects in Dataset?中描述的解决方案是唯一可用的选项。但是,您当前的代码应该可以与产品类型的默认编码器一起正常工作。

    要了解为什么您的代码在1.x中起作用而在2.0.0中不起作用,您必须检查签名。在1.x中,DataFrame.map是一种采用Row => T函数并将RDD[Row]转换为RDD[T]的方法。

    在2.0.0中,DataFrame.map也具有Row => T类型的功能,但是将Dataset[Row](又称DataFrame)转换为Dataset[T],因此T需要Encoder。如果要获得“旧”行为,则应显式使用RDD:

    df.rdd.map(row => ???)
    

    有关Dataset[Row] map的信息,请参见Encoder error while trying to map dataframe row to updated row

    关于scala - 如何在Spark 2.X数据集中创建自定义编码器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37706420/

    相关文章:

    java - 如何在 java 或 scala 中获取本地主机网络接口(interface)

    scala - 如何使用 Scala Slick 表达 Postgres 数组

    hadoop - 在 Yarn 集群上运行时 Spark 批处理未完成

    apache-spark - 如何将多维数组添加到现有的 Spark DataFrame

    apache-spark - FileNotFoundException : Spark save fails. 无法从数据集 [T] avro 清除缓存

    java - 如何在Play框架中不设置路由的情况下打开静态的.html页面?

    java - 为什么我不能将 Scala 的 Function1 隐式转换为 java.util.function.Function?

    scala - 使用共享可变状态向 RDD 添加索引

    scala - 什么会导致阶段在 Spark 中重新尝试

    apache-spark - 将数据从ElasticSearch读取到Spark数据集中