java - 如何使用 java 对 spark 数据集中的 optional 字段进行编码?

标签 java apache-spark optional encoder

我不想对数据集中使用的类的字段使用空值。我尝试使用 scala Option和 java Optional但它失败了:

    @AllArgsConstructor // lombok
    @NoArgsConstructor  // mutable type is required in java :(
    @Data               // see https://stackoverflow.com/q/59609933/1206998
    public static class TestClass {
        String id;
        Option<Integer> optionalInt;
    }

    @Test
    public void testDatasetWithOptionField(){
        Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
                new TestClass("item 1", Option.apply(1)),
                new TestClass("item .", Option.empty())
        ), Encoders.bean(TestClass.class));

        ds.collectAsList().forEach(x -> System.out.println("Found " + x));
    }

在运行时失败,并显示消息 File 'generated.java', Line 77, Column 47: Cannot instantiate abstract "scala.Option"
问题:有没有办法使用 java 对数据集中没有 null 的 optional 字段进行编码?

附属问题:顺便说一句,我在 scala 中也没有使用太多数据集,您能否验证在 scala 中实际上可以对包含选项字段的案例类进行编码?

注意:这用于中间数据集,即不读写的东西(但用于 spark 内部序列化)

最佳答案

这在 Scala 中相当简单。
Scala 实现

import org.apache.spark.sql.{Encoders, SparkSession}

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("Stack-scala")
      .master("local[2]")
      .getOrCreate()

    val ds = spark.createDataset(Seq(
      TestClass("Item 1", Some(1)),
      TestClass("Item 2", None)
    ))( Encoders.product[TestClass])

    ds.collectAsList().forEach(println)

    spark.stop()
  }

  case class TestClass(
    id: String,
    optionalInt: Option[Int] )
}
java
Java 中有各种可用的 Option 类。但是,它们都不是开箱即用的。
  • java.util.Optional : 不可序列化
  • scala.Option -> 可序列化但抽象,所以当CodeGenerator生成以下代码,失败!

  • /* 081 */         // initializejavabean(newInstance(class scala.Option))
    /* 082 */         final scala.Option value_9 = false ?
    /* 083 */         null : new scala.Option();  // ---> Such initialization is not possible for abstract classes
    /* 084 */         scala.Option javaBean_1 = value_9;
    
  • org.apache.spark.api.java.Optional -> Spark 的 Optional 实现,它是可序列化的,但具有私有(private)构造函数。因此,它失败并出现错误:没有为零个实际参数找到适用的构造函数/方法。因为这是 final类,不可能扩展它。

  • /* 081 */         // initializejavabean(newInstance(class org.apache.spark.api.java.Optional))
    /* 082 */         final org.apache.spark.api.java.Optional value_9 = false ?
    /* 083 */         null : new org.apache.spark.api.java.Optional();
    /* 084 */         org.apache.spark.api.java.Optional javaBean_1 = value_9;
    /* 085 */         if (!false) {
    

    关于java - 如何使用 java 对 spark 数据集中的 optional 字段进行编码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61344521/

    相关文章:

    java - 编写android :button programmatically

    scala - Scala Spark 中笛卡尔变换的显式排序

    scala - 过滤 RDD 时为 "Item does not take parameters"- scala、Apache Spark

    json - 将大量 JSON 文件读入 Spark Dataframe

    f# - Dapper 列到 F# 选项属性

    java-8 - 原始 "nulls"和 Java 8

    scala - 在 Scala 中巧妙地处理 Option[T]

    java - 这是 GAE 内存泄漏的证据吗?

    java - 在数组中排序

    java - 酒店预订 可能的预订