apache-spark - KryoException : Unable to find class with spark structured streaming

标签 apache-spark sbt-assembly kryo spark-structured-streaming

1-问题

我有一个 Spark 使用 的程序Kryo 但不是 的一部分 Spark 机械 .更具体地说,我正在使用 Spark 结构化流 连接到 卡夫卡 .

I read binary values coming from Kafka and decode it on my own.



我在尝试使用 Kryo 反序列化数据时遇到异常。然而,这个问题只发生在我打包我的程序并在 上运行它时。 Spark 独立集群 .也就是说,当我运行它时它不会发生,在 intellij 中,即在 中Spark 本地模式(开发模式) .

我得到的异常(exception)如下:

Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.elsevier.entellect.commons.package$RawData



请注意 原始数据 是我自己的案例类,位于我的多项目构建的子项目之一中。

要了解上下文,请在下面找到更多详细信息:

2-build.sbt:
lazy val commonSettings = Seq(
  organization  := "com.elsevier.entellect",
  version       := "0.1.0-SNAPSHOT",
  scalaVersion  := "2.11.12",
  resolvers     += Resolver.mavenLocal,
  updateOptions := updateOptions.value.withLatestSnapshots(false)
)

lazy val entellectextractors = (project in file("."))
  .settings(commonSettings).aggregate(entellectextractorscommon, entellectextractorsfetchers, entellectextractorsmappers, entellectextractorsconsumers)

lazy val entellectextractorscommon = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.esotericsoftware" % "kryo" % "5.0.0-RC1",
      "com.github.romix.akka" %% "akka-kryo-serialization" % "0.5.0" excludeAll(excludeJpountz),
      "org.apache.kafka" % "kafka-clients" % "1.0.1",
      "com.typesafe.akka" %% "akka-stream" % "2.5.16",
      "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.4",
      "com.typesafe.akka" % "akka-slf4j_2.11" % "2.5.16",
      "ch.qos.logback" % "logback-classic" % "1.2.3"
    )
  )

lazy val entellectextractorsfetchers = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
      "com.typesafe.slick" %% "slick" % "3.2.3",
      "com.typesafe.slick" %% "slick-hikaricp" % "3.2.3",
      "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "0.20") 
  )
  .dependsOn(entellectextractorscommon)

lazy val entellectextractorsconsumers = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-stream-kafka" % "0.22")
  )
  .dependsOn(entellectextractorscommon)

lazy val entellectextractorsmappers = project
  .settings(
      commonSettings,
      mainClass in assembly := Some("entellect.extractors.mappers.NormalizedDataMapper"),
      assemblyMergeStrategy in assembly := {
        case PathList("META-INF", "services", "org.apache.spark.sql.sources.DataSourceRegister") => MergeStrategy.concat
        case PathList("META-INF", xs @ _*) => MergeStrategy.discard
        case x => MergeStrategy.first},
      dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.5",
      dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.5",
      dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.9.5",
      dependencyOverrides += "org.apache.jena" % "apache-jena" % "3.8.0",
      libraryDependencies ++= Seq(
      "org.apache.jena" % "apache-jena" % "3.8.0",
      "edu.isi" % "karma-offline" % "0.0.1-SNAPSHOT",
      "org.apache.spark" % "spark-core_2.11" % "2.3.1" % "provided",
      "org.apache.spark" % "spark-sql_2.11" % "2.3.1" % "provided",
      "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1"
      //"com.datastax.cassandra" % "cassandra-driver-core" % "3.5.1"
    ))
  .dependsOn(entellectextractorscommon)



lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")

包含spark代码的子项目是entellectextractorsmappers .包含案例类 的子项目原始数据 找不到的是entellectextractorscommon . entellectextractorsmappers明确依赖于 entellectextractorscommon .

3- 在本地独立集群上提交和在本地开发模式下运行的区别:

当我提交到集群时,我的 Spark 依赖如下:
  "org.apache.spark" % "spark-core_2.11" % "2.3.1" % "provided",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.1" % "provided",

当我在本地开发模式(无提交脚本)下运行时,它们会变成这样
  "org.apache.spark" % "spark-core_2.11" % "2.3.1",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.1",

也就是说,在本地开发中,我需要拥有依赖项,而在以独立模式提交到集群时,它们已经在集群中,因此我将它们按提供的方式放置。

4-如何提交 :
spark-submit --class entellect.extractors.mappers.DeNormalizedDataMapper --name DeNormalizedDataMapper --master spark://MaatPro.local:7077  --deploy-mode cluster --executor-memory 14G --num-executors 1 --conf spark.sql.shuffle.partitions=7 "/Users/maatari/IdeaProjects/EntellectExtractors/entellectextractorsmappers/target/scala-2.11/entellectextractorsmappers-assembly-0.1.0-SNAPSHOT.jar"

5-我如何使用 Kryo :

5.1-声明和注册

在 entellectextractorscommon 项目中,我有一个包含以下内容的包对象:
package object commons {

  case class RawData(modelName: String,
                     modelFile: String,
                     sourceType: String,
                     deNormalizedVal: String,
                     normalVal: Map[String, String])

  object KryoContext {
    lazy val kryoPool = new Pool[Kryo](true, false, 16) {
      protected def create(): Kryo = {
        val kryo = new Kryo()
        kryo.setRegistrationRequired(false)
        kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
        kryo
      }
    }

    lazy val outputPool = new Pool[Output](true, false, 16) {
      protected def create: Output = new Output(4096)
    }

    lazy val inputPool = new Pool[Input](true, false, 16) {
      protected def create: Input = new Input(4096)
    }
  }

  object ExecutionContext {

    implicit lazy val system  = ActorSystem()
    implicit lazy val mat     = ActorMaterializer()
    implicit lazy val ec      = system.dispatcher

  }

}

5.2-用法

在 entellectextractorsmappers( Spark 程序所在的位置)中,我使用 mapMartition .在其中,我有一种方法来解码来自 kafka 的数据,该数据使用 Kryo:
def decodeData(rowOfBinaryList: List[Row], kryoPool: Pool[Kryo], inputPool: Pool[Input]): List[RawData] = {

    val kryo = kryoPool.obtain()
    val input = inputPool.obtain()
    val data = rowOfBinaryList.map(r => r.getAs[Array[Byte]]("message")).map{ binaryMsg =>
      input.setInputStream(new ByteArrayInputStream(binaryMsg))
      val value = kryo.readClassAndObject(input).asInstanceOf[RawData]
      input.close()
      value
    }
    kryoPool.free(kryo)
    inputPool.free(input)
    data
  }

注意:对象 KryoContext + Lazy val 确保每个 JVM 实例化一次 kryoPool。然而,我不认为问题来自于此。

I red in some other place a hint about issues of classLoaders used by spark vs Kryo? But not sure to really understand what is going on.



如果有人能给我一些指点,那会有所帮助,因为我不知道从哪里开始。为什么它会在本地模式下而不是在集群模式下工作,提供的依赖项是否会弄乱依赖关系并给 Kryo 带来一些问题?是 SBT 大会合并策略搞砸了吗?

可能有很多指针,如果有人能帮我缩小范围,那就太棒了!

最佳答案

迄今为止,

我通过选择“封闭”类加载器解决了这个问题,我认为它是来自 Spark 的类加载器。这是在准备了一些关于 Kryo 和 Spark 之间的类加载器问题的评论之后:

lazy val kryoPool = new Pool[Kryo](true, false, 16) {
      protected def create(): Kryo = {
        val cl = Thread.currentThread().getContextClassLoader()
        val kryo = new Kryo()
        kryo.setClassLoader(cl)
        kryo.setRegistrationRequired(false)
        kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
        kryo
      }
    }

关于apache-spark - KryoException : Unable to find class with spark structured streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52572757/

相关文章:

scala - sbt 程序集合并问题 [去重 : different file contents found in the following]

java - 为 Kryo 序列化程序编写单元测试

python - Spark : IllegalArgumentException: 'Unsupported class file major version 55'

java - 使用 Java 中的 Apache Spark 解析带有数组的 Json 对象并映射到多个对

scala - 如何使用 sbt-assemble 制作多项目 fat jar

scala - 如何将 "provided"依赖项添加回运行/测试任务的类路径?

java - Kryo Serializer 运行时出现 IllegalAccessError

apache-spark - 解决 "Kryo serialization failed: Buffer overflow"Spark 异常

apache-spark - Spark Streaming 的热图

apache-spark - Spark 2.1 结构化流 - 使用 Kakfa 作为 Python 源 (pyspark)