java - Spark Dataframe 到 Java 类的 Dataset

标签 java scala apache-spark

我想将以 Json 形式读入的 Dataframe 转换为给定类的 Dataset。到目前为止,当我能够编写自己的案例类时,效果很好。

case class MyCaseClass(...)
val df = spark.read.json("path/to/json")
val ds = df.as[MyCaseClass]

def myFunction(input: MyCaseClass): MyCaseClass = {
    // Do some validation and things
    input
}

ds.map(myFunction)

但是,现在我绑定(bind)到外部 Java 类(特别是由 thrift 创建的类)。所以这里有一个更具体的自定义类示例:

JSON:

{"a":1,"b":"1","wrapper":{"inside":"1.1", "map": {"k": "v"}}}
{"a":2,"b":"2","wrapper":{"inside":"2.1", "map": {"k": "v"}}}
{"a":3,"b":"3","wrapper":{"inside":"3.1", "map": {"k": "v"}}}

类:

class MyInnerClass(var inside: String, var map: Map[String, String]) extends java.io.Serializable {
  def getInside(): String = {inside}
  def setInside(newInside: String) {inside = newInside}
  def getMap(): Map[String, String] = {map}
  def setMap(newMap: Map[String, String]) {map = newMap}
}

class MyClass(var a: Int, var b: String, var wrapper: MyInnerClass)  extends java.io.Serializable {
  def getA(): Int = {a}
  def setA(newA: Int) {a = newA}
  def getB(): String = {b}
  def setB(newB: String) {b = newB}
  def getWrapper(): MyInnerClass = {wrapper}
  def setWrapper(newWrapper: MyInnerClass) {wrapper = newWrapper}
}

所以我想做的是:

val json = spark.read.json("path/to/json")
json.as[MyClass]

但是,抛出:

Unable to find encoder for type stored in a Dataset.  Primitive type (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

所以,我发现了自定义编码器:(herehere)

import org.apache.spark.sql.Encoders
val kryoMyClassEncoder  = Encoders.kryo[MyClass]
json.as[MyClass](kryoMyClassEncoder)

抛出:

Try to map struct<a:bigint,b:string,wrapper:struct<inside:string,map:struct<k:string>>> to Tuple1, but failed as the number of fields does not line up

那么如何将 Dataframe 转换为自定义对象 Dataset。

最佳答案

不要使用 kryo 编码器,而是尝试使用产品编码器,即:

val productMyClassEncoder  = Encoders.product[MyClass]

关于java - Spark Dataframe 到 Java 类的 Dataset,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41876965/

相关文章:

scala - Scala 中的惯用 Haskell 式迭代?

Scala.js 中的 Javascript 生成器

python - 将 pyspark 数据框中的周末日期移至上一个工作日

apache-spark - 如何在 HiveThriftServer2 中注册自定义 UDF jar?

java - 如何使 JEE6 javadoc 为 NetBeans 7.0.1 中的常规 JavaSE 项目工作?

java - jsp - 使用 unicode(希伯来字母)发布请求未正确显示

java - 使用 Spring Social、Spring security 登录后重定向到原始 URL?

java - 解决点和线问题的算法?

android - 在 Android 上使用 SubCut (Scala DI)

r - Sparklyr/Hive : how to use regex (regexp_replace) correctly?