java - Spark & Drools - 如何使用 Kryo 序列化 KieBase

标签 java scala apache-spark drools kryo

我在尝试序列化 Spark 中的 KieBase 对象时遇到异常。

当我执行以下代码时:

val kieBase = kieContainer.getKieBase
val broadcastKieBase = spark.sparkContext.broadcast(kieBase)

引发ConcurrentModificationException

Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException

环顾四周,我还发现了 this is a known problem ,但还是没有解决。

有人知道如何使用 Kryo 序列化 KieBase 实例吗?

最佳答案

如果您通过设置 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 来使用 KryoSerializer 执行 Spark 作业,或者将其设置为 KryoSerializer 作为默认,然后请按照以下步骤操作:

通过扩展org.apache.spark.serializer.KryoRegistrator创建DroolsSerializerRegistration类。为com.esotericsoftware.kryo.serializers.JavaSerializer注册类org.drools.core.impl.KnowledgeBaseImpl,如下所示:

package com.spark.kryo.serializers

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.serializers.JavaSerializer
import org.apache.spark.serializer.KryoRegistrator

class DroolsSerializerRegistration extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[org.drools.core.impl.KnowledgeBaseImpl], new JavaSerializer)
   }
}

然后在SparkConf中设置以下配置 .config("spark.kryo.registrator", "com.spark.kryo.serializers.DroolsSerializerRegistration")

如果您不确定这是否是由于 org.drools.core.impl.KnowledgeBaseImpl 造成的,请在 SparkConf 中设置以下配置以获取导致问题的确切类。 .config("spark.kryo.registrationRequired", "true")

在某些情况下,它可能会给出多个类,然后使用 com.esotericsoftware.kryo.serializers.JavaSerializer 注册所有类。 找出与 Drools 关联的所有类后,删除 spark.kryo.registrationRequired 并仅注册那些与 Drools 库关联的类,而不是所有失败的类。

关于java - Spark & Drools - 如何使用 Kryo 序列化 KieBase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44635932/

相关文章:

java - 在 File.createTempFile() 上使用 Files.copy() 时出现 FilesAlreadyExistsException

scala - 如何在遍历 Scala future 序列时减少上下文切换

sql - Spark DataFrame中IFNULL和IFF的等效SQL函数

python - 像 pyspark 的 jar 一样打包

java - 如何从 Eclipse Kepler 中删除 EGIT?

java - 通用断言失败

java - Android 日期格式转换为自定义日期

scala - 哪些 Scala 注释修改了编译器的消息?

scala - 在集群上分发 Scala?

apache-spark - 如何将 Cassandra ResultSet 转换为 Spark DataFrame?