java - 未找到与带有 Base 的可序列化的 Product 对应的 Java 类

标签 java scala apache-spark rdd apache-spark-dataset

我编写了两个案例类,它们扩展了Base abstract class。我有每个类的两个列表(listAlistB)。当我想合并这两个列表时,我无法将最终列表转换为 Apache Spark 1.6.1 数据集。

abstract class Base

case class A(name: String) extends Base
case class B(age: Int) extends Base

val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()

Apache Spark 将引发此异常:

A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found
    at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)

当我想从 list 创建 RDD 时,Spark 不会抛出任何异常,但是当我使用 toDS() 方法将 RDD 转换为 Dataset 时,这个先前的异常将抛出.

最佳答案

首先,您可以通过将 list 显式设为 List[Base] 或通过添加 Base extends Product with Serializable 来获得更合理的类型> 如果其目的是仅通过案例类/对象对其进行扩展。但这还不够,因为

Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.

请注意,不支持像 Base 这样的抽象类。也不支持自定义编码器。尽管您可以尝试使用 kryo(或 javaSerialization,作为最后的手段)编码器,请参阅 How to store custom objects in Dataset? .

这是完整的工作示例:

abstract class Base extends Serializable with Product

case class A(name: String) extends Base

case class B(age: Int) extends Base

object BaseEncoder {
  implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base]
}


val listA: Seq[A] = Seq(A("a"), A("b"))
val listB: Seq[B] = Seq(B(1), B(2))
val list: Seq[Base] = listA ++ listB

val ds = sc.parallelize(list).toDS

关于java - 未找到与带有 Base 的可序列化的 Product 对应的 Java 类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37510063/

相关文章:

带有纯文本的 JavaFX/CSS 透明文本区域

scala - 在聚合 monad 上实现 flatMap

scala - 类型构造函数推断的高阶统一

java - OneToMany关系错误: Field 'XX' doesn't have a default value

java - 什么是 NullPointerException,我该如何解决?

scala - 如何将 Spark RDD[Array[MyObject]] 转换为 RDD[MyObject]

scala - 如何在Spark SQL(DataFrame)的UDF中使用常量值

python - 我应该将广播变量或broadcast.value()传递到我的RDD[自定义对象]中吗?

mongodb - pyspark-mongodb 集合读取命令不会执行

java - 无法在 JavaFX UI 中重用文本字段