apache-spark - 如何将行映射到 protobuf 生成的类?

标签 apache-spark apache-spark-sql protocol-buffers apache-spark-encoders

我需要编写一个读取 DataSet[Row] 并将其转换为 DataSet[CustomClass] 的作业
其中 CustomClass 是一个 protobuf 类。

val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}(protoEncoder)

但是,看起来 Protobuf 类并不是真正的 Java Bean,我确实在以下方面获得了 NPE
val x =  Encoders.bean(classOf[CustomClass])

如何确保工作可以发出类型的数据集
DataSet[CustomClass] 其中 CustomClass 是 protobuf 类。
关于为类编写自定义编码器的任何指针/示例?

NPE:
val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
  ... 48 elided

Bean 编码器内部使用
JavaTypeInference.serializerFor(protoClass)

如果我尝试在自定义编码器中执行相同操作,则会收到更具描述性的错误消息:
Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
        at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
        at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
        at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
        at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)

最佳答案

要将 Row 转换为 Protobuf 类,您可以使用 sparksql-protobuf

This library provides utilities to work with Protobuf objects in SparkSQL. It provides a way to read parquet file written by SparkSQL back as an RDD of the compatible protobuf object. It can also convert RDD of protobuf objects into DataFrame.


为您的 build.sbt 添加依赖项文件
resolvers += Resolver.jcenterRepo

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1"
)
您可以按照库中的一些示例开始使用
Example 1
Example 2
我希望这有帮助!

关于apache-spark - 如何将行映射到 protobuf 生成的类?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44768809/

相关文章:

python - 使用 Pyspark 查询数据框中的 json 对象

scala - 在spark MLlib中,如何在spark scala中将字符串转换为整数?

scala - Spark中使用UDF时任务序列化错误

apache-spark - Spark Sql Dedup 行

apache-spark - 如何计算通过百分比并插入到 Spark 数据框中的列中?

go - 如何将 protoc-gen-go gzipped FileDescriptorProto 显示为纯文本?

c# - 如何从 json 序列化 protobuf,并自动忽略 C# 中的未知字段?

python - Spark RDD 删除具有多个键的记录

scala - Spark 数据帧 : Pivot and Group based on columns

javascript - 以 json 形式返回 protobuf 对象