我需要编写一个读取 DataSet[Row] 并将其转换为 DataSet[CustomClass] 的作业
其中 CustomClass 是一个 protobuf 类。
val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
case Row(f1: String, f2: Long ) => {
val pbufClass = CustomClass.newBuilder()
.setF1(f1)
.setF2(f2)
pbufClass.build()}}(protoEncoder)
但是,看起来 Protobuf 类并不是真正的 Java Bean,我确实在以下方面获得了 NPE
val x = Encoders.bean(classOf[CustomClass])
如何确保工作可以发出类型的数据集
DataSet[CustomClass] 其中 CustomClass 是 protobuf 类。
关于为类编写自定义编码器的任何指针/示例?
NPE:
val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
... 48 elided
Bean 编码器内部使用
JavaTypeInference.serializerFor(protoClass)
如果我尝试在自定义编码器中执行相同操作,则会收到更具描述性的错误消息:
Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
最佳答案
要将 Row 转换为 Protobuf 类,您可以使用 sparksql-protobuf
This library provides utilities to work with Protobuf objects in SparkSQL. It provides a way to read parquet file written by SparkSQL back as an RDD of the compatible protobuf object. It can also convert RDD of protobuf objects into DataFrame.
为您的
build.sbt
添加依赖项文件resolvers += Resolver.jcenterRepo
libraryDependencies ++= Seq(
"com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
"org.apache.parquet" % "parquet-protobuf" % "1.8.1"
)您可以按照库中的一些示例开始使用
Example 1
Example 2
我希望这有帮助!
关于apache-spark - 如何将行映射到 protobuf 生成的类?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44768809/