apache-spark - 需要 Spark (Scala) 中的 kryo 序列化

标签 apache-spark kryo

我用这个打开了 kryo 序列化:

conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )

我想确保自定义类在节点之间移动时使用 kryo 进行序列化。我可以通过这种方式向 kryo 注册类(class):

conf.registerKryoClasses(Array(classOf[Foo]))

据我了解,这实际上并不能保证使用 kyro 序列化;如果序列化器不可用,kryo 将回退到 Java 序列化。

为了保证 kryo 序列化发生,我遵循了 Spark 文档中的建议:

conf.set("spark.kryo.registrationRequired", "true")

但这会导致针对一堆不同的类抛出 IllegalArugmentException(“类未注册”),我假设 Spark 在内部使用这些类,例如以下内容:

org.apache.spark.util.collection.CompactBuffer
scala.Tuple3

我当然不必向 kryo 手动注册每个单独的类吗?这些序列化器都是在kryo中定义的,那么有没有办法自动注册它们呢?

最佳答案

As I understand it, this does not actually guarantee that kyro serialization is used; if a serializer is not available, kryo will fall back to Java serialization.

没有。如果您设置spark.serializerorg.apache.spark.serializer. KryoSerializer那么 Spark 将使用 Kryo。如果 Kryo 不可用,您将收到错误消息。没有后备方案。

那么这个 Kryo 注册是什么?

当 Kryo 序列化未注册类的实例时,它必须输出完全限定的类名。这是很多角色。相反,如果一个类已预先注册,Kryo 只能输出对该类的数字引用,该引用只有 1-2 个字节。

当 RDD 的每一行都使用 Kryo 序列化时,这一点尤其重要。您不想为十亿行中的每一行包含相同的类名。所以你预先注册了这些类(class)。但是很容易忘记注册一个新类,然后你又浪费了字节。解决方案是要求每个类都被注册:

conf.set("spark.kryo.registrationRequired", "true")

现在 Kryo 将永远不会输出完整的类名。如果遇到未注册的类,则会出现运行时错误。

不幸的是,很难枚举您要提前序列化的所有类。这个想法是 Spark 注册 Spark 特定的类,然后您注册其他所有内容。您有一个 RDD[(X, Y, Z)] ?您必须注册classOf[scala.Tuple3[_, _, _]] .

list of classes that Spark registers实际上包括CompactBuffer ,所以如果您看到错误,那么您就做错了。您正在绕过 Spark 注册过程。您必须使用 spark.kryo.classesToRegisterspark.kryo.registrator注册您的类(class)。 (请参阅 config options 。如果您使用 GraphX,您的注册者应调用 GraphXUtils. registerKryoClasses 。)

关于apache-spark - 需要 Spark (Scala) 中的 kryo 序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31394140/

相关文章:

r - R中的并行预测

scala - 如何使用 Spark 的 kryo 序列化注册 byte[][]

apache-spark - Kryo序列化器如何在Spark中分配缓冲区

java - kryo.readObject 导致 ArrayList 出现 NullPointerException

java - 序列化并不总是以序列化原始类型结束吗?

scala - scala Spark 中的 RDD 过滤器

scala - 如何在 scala 中将函数注册到 sqlContext UDF?

amazon-web-services - 如何在 EC2 t2micro 实例上组装 Spark(并避免 "java.lang.OutOfMemoryError: Java heap space"错误)?

python - 将 JSON 键值分解为新行

scala - 了解 Kryo 序列化缓冲区溢出错误