scala - Spark、Kryo ProtoBuf 字段的序列化问题

标签 scala apache-spark kryo

在运行与转换 RDD 时 protobuf 字段的序列化相关的 Spark 作业时,我看到错误。

com.esotericsoftware.kryo.KryoException:java.lang.UnsupportedOperationException 序列化跟踪: otherAuthors_ (com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks)

错误似乎是在此时创建的:

val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map {
      tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b) &&     !isBookPublished(o)).mapPartitions( it =>
      it.map{ord =>
        (ord.getAuthor, ord.getPublisherName, getGenre(ord.getSourceCountry))}))
}

val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) =>
  opt.map(o => (tier, o._1, PublisherCompanyComparison, o._3)).countByValue()
}

val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) =>
  opt.map(o => (tier, o._1, PublisherComparison(o._2), o._3)).countByValue()
}

该字段是 protobuf 中指定的列表,如下所示:

otherAuthors_ = java.util.Collections.emptyList()

正如您所看到的,代码实际上并未利用 Book Protobuf 中的该字段,尽管它仍在通过网络传输。

有人对此有任何建议吗?

最佳答案

好吧,老问题,但这是给后代的答案。默认的 kryo 序列化器不能很好地处理某些集合。有一个第三方库可以帮助解决这个问题:kryo-serializers

在您的情况下,您可能需要在创建 Spark 配置时提供自定义 kryo 注册器:

val conf = new SparkConf()
conf.set("spark.kryo.registrator", "MyKryoRegistrator")

在您的注册器中进行所需的自定义注册:

class MyKryoRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
        kryo.register( Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer() );
        // Probably should use proto serializer for your proto classes
        kryo.register( Book.class, new ProtobufSerializer() );
    } 
}

关于scala - Spark、Kryo ProtoBuf 字段的序列化问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38345362/

相关文章:

Scalafmt:在列表中保留空行

scala - 在 Scala 中从 Array[String] 转换为 Seq[String]

apache-spark - 为什么kryo注册在SparkSession中不起作用?

serialization - 我如何在 flink 中使用 joda.time(或者我如何使用 typeutils.runtime.kryo)

java - 使用 Kryo 反序列化包含一些不可反序列化对象的数组(抢救可反序列化部分)

Scala - 动态对象/类加载

hadoop - 使用spark读取avro数据并无法将org.apache.avro.util.Utf8强制转换为java.lang.String异常

apache-spark - PySpark withColumn & withField 类型错误 : 'Column' object is not callable

apache-spark - Elasticsearch 支持 spark 2.4.2 和 scala 2.12

scala - sbt : findbugs, scalastyle 中的 SonarQube 运行者