对于Pojo /基元,Spark数据集从Row移到Encoder
。 Catalyst
引擎使用ExpressionEncoder
转换SQL表达式中的列。但是,似乎没有Encoder
的其他子类可用作我们自己的实现的模板。
这是Spark 1.X / DataFrames中无法在新架构下编译的代码示例:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] => arr
case _ => {
log.error("Unsupport value type")
null
}
}
(id, label, channels, height, width, data)
}).persist(StorageLevel.DISK_ONLY)
}
我们得到一个编译器错误
Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map(row => {
^
因此,在某种程度上/某处应该有一种方法
DataFrame
(现在是Row
类型的数据集)上执行映射时应用它我正在寻找可以成功执行这些步骤的代码。
最佳答案
据我所知,自1.6以来并没有真正改变,How to store custom objects in Dataset?中描述的解决方案是唯一可用的选项。但是,您当前的代码应该可以与产品类型的默认编码器一起正常工作。
要了解为什么您的代码在1.x中起作用而在2.0.0中不起作用,您必须检查签名。在1.x中,DataFrame.map
是一种采用Row => T
函数并将RDD[Row]
转换为RDD[T]
的方法。
在2.0.0中,DataFrame.map
也具有Row => T
类型的功能,但是将Dataset[Row]
(又称DataFrame
)转换为Dataset[T]
,因此T
需要Encoder
。如果要获得“旧”行为,则应显式使用RDD
:
df.rdd.map(row => ???)
有关
Dataset[Row]
map
的信息,请参见Encoder error while trying to map dataframe row to updated row
关于scala - 如何在Spark 2.X数据集中创建自定义编码器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37706420/