java - SparkSQL + Java : Pojo to Tabular Format while working with Datasets

标签 java apache-spark apache-spark-sql apache-spark-dataset

我对 Spark SQL 还很陌生。在实现一项训练任务时,我遇到了以下问题并且找不到答案(以下所有示例都有点愚蠢,但出于演示目的应该还是可以的)。

我的应用程序读取 Parquet 文件并根据其内容创建数据集:

DataFrame input = sqlContext.read().parquet("src/test/resources/integration/input/source.gz.parquet");
Dataset<Row> dataset = input.as(RowEncoder$.MODULE$.apply(input.schema()));

dataset.show() 调用结果:

+------------+----------------+--------+
+    Names   +       Gender   +   Age  +
+------------+----------------+--------+
| Jack, Jill |  Male, Female  | 30, 25 |

然后我将数据集转换为一个新的数据集,其中包含 Person 类型:

public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
    return rawData
            .flatMap((Row sourceRow) -> {
                // code to parse an input row and split person data goes here
                Person person1 = new Person(name1, gender1, age1);
                Person person2 = new Person(name2, gender2, age2);
                return Arrays.asList(person1, person2);
            }, Encoders.bean(Person.class));
}

哪里

public abstract class Human implements Serializable {
   protected String name;
   protected String gender;
   // getters/setters go here
   // default constructor + constructor with the name and gender params
 }
 public class Person extends Human {
   private String age;
   // getters/setters for the age param go here
   // default constructor + constructor with the age, name and gender params
   // overriden toString() method which returns the string: (<name>, <gender>, <age>)
 }

最后,当我显示数据集的内容时,我希望看到

 +------------+----------------+--------+
 +    name    +       gender   +   age  +
 +------------+----------------+--------+
 |     Jack   |     Male       |   30   |
 |     Jill   |     Femail     |   25   |

但是,我明白了

+-------------------+----------------+--------+
+      name         +       gender   +   age  +
+-------------------+----------------+--------+
|(Jack, Male, 30)   |                |        |
|(Jill, Femail, 25) |                |        |

这是 toString() 方法的结果,而 header 是正确的。 我相信编码器有问题,只要我使用它显示的 Encoders.javaSerizlization(T) 或 Encoders.kryo(T)

+------------------+
+        value     +
+------------------+
|(Jack, Male, 30)  |
|(Jill, Femail, 25)|

最让我担心的是,编码器的错误使用可能会导致错误的 SerDe 和/或性能损失。 在我能找到的所有 Spark Java 示例中,我看不出有什么特别之处...

你能建议我做错了什么吗?

更新1

这是我的项目的依赖项:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

解决方案

正如 abaghel 建议我将版本升级到 2.0.2 (请注意,在 2.0.0 版本上有 the bug for Windows ),在我的代码中到处使用 Dataset 而不是 DataFrames (似乎 DataFrames 不是 Apache 的一部分) Spark starting from 2.0.0 ),并使用基于迭代器的 flatMap 函数从 Row 转换为 Person。

分享一下,使用 TraversableOnce-based flatMap 的方法版本 1.6.2 对我不起作用,因为它引发了“MyPersonConversion$function1 不可序列化”异常。

现在一切都按预期进行。

最佳答案

您使用的 Spark 版本是什么?您提供的 flatMap 方法未使用 2.2.0 版本进行编译。所需的返回类型为 Iterator<Person> 。请使用下面的 FlatMapFunction,您将获得所需的输出。

public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
    return rawData.flatMap(row -> {
        String[] nameArr = row.getString(0).split(",");
        String[] genArr = row.getString(1).split(",");
        String[] ageArr = row.getString(2).split(",");
        Person person1 = new Person(nameArr[0], genArr[0], ageArr[0]);
        Person person2 = new Person(nameArr[1], genArr[1], ageArr[1]);
        return Arrays.asList(person1, person2).iterator();
    }, Encoders.bean(Person.class));
}

//Call function
Dataset<Person> dataset1 = transformToPerson(dataset);
dataset1.show();

关于java - SparkSQL + Java : Pojo to Tabular Format while working with Datasets,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46524136/

相关文章:

scala - 在SPARK SQL中读取分区的HIVE表

java - Spark Java - 无法获取 java.lang.String 的正确类标签

java - 使用 Spark 将字段添加到 Csv

java - Maven Flyway 发现非空架构 "PUBLIC"但没有架构历史表

java - 我如何在 C++ 的堆栈上正确添加 QTWidgets?

scala - 使用不同的 StreamingContext 相继打开两个 KafkaStream

java - 基于现有 Dataset<Row> 和添加的 HashMap 创建新的 Spark Dataset<Row>

scala - 尝试将数据帧行映射到更新行时出现编码器错误

java - 如何通过选择微调器来启用或禁用 editText

java - 权限拒绝 : opening provider android. support.v4.content.FileProvider