apache-spark - 编码后无法对自定义类型进行操作? Spark 数据集

标签 apache-spark apache-spark-dataset kryo apache-spark-encoders

假设你有这个(编码自定义类型的解决方案来自 this thread ):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

当执行 ds.show 时,我得到:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

我理解这是因为内容被编码为内部 Spark SQL 二进制表示。但是我怎样才能像这样显示解码后的内容呢?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

更新1

显示内容不是最大的问题,更重要的是它可能导致处理数据集时出现问题,考虑这个例子:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

这是否意味着,kryo-encoded 类型不能方便地进行类似joinWith 的操作?

How do we process custom type on Dataset then?
If we are not able to process it after it's encoded, what's the point of this kryo encoding solution on custom type?!

(下面@jacek 提供的解决方案对于case class 类型很好理解,但它仍然无法解码自定义类型)

最佳答案

以下对我有用,但似乎使用高级 API 来执行低级(反序列化)工作。

这并不是说应该这样做,而是表明这是可能的。

我不知道为什么 KryoDeserializer 不将字节反序列化为字节来自的对象。就是这样。

你的类定义和我的一个主要区别是这个案例让我可以使用以下技巧。同样,不知道为什么它使它成为可能。

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
 |-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
  import java.nio.ByteBuffer
  ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

关于apache-spark - 编码后无法对自定义类型进行操作? Spark 数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64184387/

相关文章:

java - 如何使用 JavaSparkContext 处理来自 Kafka 的记录中带有文件名的文件?

java - 如何解决 Spark java.lang.OutOfMemoryError : Java heap space while writing out in delta format?

java - 如何使用 Spark 数据集 API (Java) 创建数组列

java - 如何在 Spark Java 中遍历/迭代数据集?

python - PySpark 如何读取具有多种编码字符串的文件

apache-spark - 根据spark中的值删除重复的键

scala - Apache Spark 2.0 : java. lang.UnsupportedOperationException : No Encoder found for java. time.LocalDate

scala - Spark Kryo 序列化失败

hadoop - Spark 作业中的 Kryo 序列化错误

Spark 2.x 数据集的 Kryo 序列化