scala - 如何使用 Spark 的 kryo 序列化注册 byte[][]

标签 scala apache-spark kryo

我正在尝试充分利用 Spark 的 kryo 序列化。设置

.set("spark.kryo.registrationRequired", "true")

这会让我知道哪些类(class)需要注册。我已经注册了大约40个类(class),有我的类(class)和spark的一些类(class)。我关注了Require kryo serialization in Spark (Scala)发布注册/设置一切。

我现在遇到以下问题,无法弄清楚如何在 scala 中注册它。有人解决这个问题了吗?

我尝试了很多不同的组合,包括:

kryo.register(classOf[Array[Array[Byte]]])
conf.set("classesToRegister", "classOf[Array[Array[Byte]]]")
conf.registerKryoClasses(Array(classOf[Array[Array[Byte]]]))

我发现了一个未回复的帖子https://mail-archives.apache.org/mod_mbox/spark-user/201603.mbox/%3CCAHCfvsSyUpx78ZFS_A9ycxvtO1=Jp7DfCCAeJKHyHZ1sugqHEQ@mail.gmail.com%3E提出同样的问题。

java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: byte[][]
Note: To register this class use: kryo.register(byte[][].class);
Serialization trace:
buffers (org.apache.spark.sql.columnar.CachedBatch)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:191)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:480)
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

最佳答案

conf.registerKryoClasses(Array( Class.forName("[[B"))) 

应该可以

关于scala - 如何使用 Spark 的 kryo 序列化注册 byte[][],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37790946/

相关文章:

scala - 如何在 Scala 中将 Comparable<A> 转换为 Ordering[A]

scala - scala 项目中的环境属性文件

cassandra - 如何使用 saveTOCassandra()

azure - 如何更改在 Azure Databricks 中运行作业的 Spark 用户?

java - SparkContext和SparkSession : How to get the "parallelizePairs()"?

java - 在 Titan 数据库中使用 kryo 序列化对象

java - Apache Kafka + Kryo 序列化

scala - 如何访问隐式 "implicit"即 def a[A :B] or def a[A <% B]?

scala - Scala 中的 Shuffle 范围很奇怪

elasticsearch - ElasticSearch JarHell com.esotericsoftware.minlog.Log $ Logger