java - 在 Apache Spark Dataset<Row> 上应用 flatMap 操作时出现意外的编码器行为

标签 java apache-spark apache-spark-dataset

我正在尝试将实际包含 double 值的 csv 字符串转换为 spark-ml 兼容数据集。因为我事先不知道预期的功能数量,所以我决定使用一个辅助类“Instance”,它已经包含了分类器使用的正确数据类型,并且在其他一些情况下已经按预期工作:

public class Instance implements Serializable {
    /**
     * 
     */
    private static final long serialVersionUID = 6091606543088855593L;
    private Vector indexedFeatures;
    private double indexedLabel;
    ...getters and setters for both fields...
}

我得到意外行为的部分是:

    Encoder<Instance> encoder = Encoders.bean(Instance.class);
    System.out.println("encoder.schema()");
    encoder.schema().printTreeString();
    Dataset<Instance> dfInstance = df.select("value").as(Encoders.STRING())
              .flatMap(s -> {
                String[] splitted = s.split(",");

                int length = splitted.length;
                double[] features = new double[length-1];
                for (int i=0; i<length-1; i++) {
                    features[i] = Double.parseDouble(splitted[i]);
                }

                if (length < 2) {
                    return Collections.emptyIterator();
                } else {
                    return Collections.singleton(new Instance( 
                        Vectors.dense(features), 
                        Double.parseDouble(splitted[length-1])
                        )).iterator();
                }
              }, encoder);

    System.out.println("dfInstance");
    dfInstance.printSchema();
    dfInstance.show(5);

然后我在控制台上得到以下输出:

encoder.schema()
root
 |-- indexedFeatures: vector (nullable = true)
 |-- indexedLabel: double (nullable = false)

dfInstance
root
 |-- indexedFeatures: struct (nullable = true)
 |-- indexedLabel: double (nullable = true)

+---------------+------------+
|indexedFeatures|indexedLabel|
+---------------+------------+
|             []|         0.0|
|             []|         0.0|
|             []|         1.0|
|             []|         0.0|
|             []|         1.0|
+---------------+------------+
only showing top 5 rows

编码器模式正确地将 indexedFeatures 行数据类型显示为 vector 。但是当我应用编码器并进行转换时,它会给我一行结构类型,不包含任何真实对象。

我想了解,为什么 Spark 为我提供结构类型而不是正确的 vector 类型。

最佳答案

实际上,我的回答并不能解释为什么你会得到一个结构类型。但基于previous question ,我或许可以提供解决方法。

原始输入用DataFrameReader's csv function解析,然后又是一个VectorAssembler使用:

Dataset<Row> csv = spark.read().option("inferSchema", "true")
  .csv(inputDf.select("value").as(Encoders.STRING()));
String[] fieldNames = csv.schema().fieldNames();    
VectorAssembler assembler = new VectorAssembler().setInputCols(
  Arrays.copyOfRange(fieldNames, 0, fieldNames.length-1))
  .setOutputCol("indexedFeatures");
Dataset<Row> result = assembler.transform(csv)
  .withColumn("indexedLabel", functions.col(fieldNames[fieldNames.length-1]))
  .select("indexedFeatures", "indexedLabel");

关于java - 在 Apache Spark Dataset<Row> 上应用 flatMap 操作时出现意外的编码器行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49676184/

相关文章:

java - testcontainers, hikari 和 Failed to validate connection org.postgresql.jdbc.PgConnection

java - java获取windows网卡 "Media State"?

java.lang.NoClassDefFoundError & 转换为 dalvik 格式失败,错误 1 ​​未知问题

xml - Apache Spark 数据框列爆炸为多个列

scala - Spark DataFrame 在 OneHotEncoder 中处理空字符串

scala - 将列表转换为数据帧 Spark scala

scala - 为什么 dataset 的 foreach 方法不需要编码器,而 map 需要?

apache-spark - 数据框到数据集,类型为 Any

java - 如何在 Java 中将 DataFrame 转换为 Apache Spark 中的数据集?

java - 为什么我的枚举类型无法初始化?