java - 将用户定义的对象转换为数据帧并写入 RDBMS - 如何维护与数据库的映射?

标签 java apache-spark spark-streaming apache-spark-sql

我在 mysql 中有以下表结构:

create table user(
id INT NOT NULL,
name VARCHAR(20) NOT NULL,
age INT NOT NULL,
address VARCHAR(100) NOT NULL);

现在,我想编写一个 Spark 流作业,从 Kafka 读取数据,进行一些处理和过滤,然后写入“User”表中的 RDBMS。

为此,我首先创建了表的 POJO 表示 -

@Data
class User implements Serializable {
private int id;
private String name;
private int age;
private String address;
}

下面,我编写了将 rdd 转换为 dataframe 的 Spark 作业 -

JavaDStream<User> userStream = ... // created this stream with some processing
userStream.foreachRDD(rdd -> {
DataFrame df = sqlContext.createDataFrame(rdd,User.class);
df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
});

现在,一旦我执行这段代码,因为数据帧是以偶然方式形成的,并且它不与数据库模式同步。因此,它尝试将“地址”插入“id”列中,并以 sql 异常退出。

我无法理解如何使数据框架理解数据库的模式并相应地从用户对象加载数据。有什么办法可以做到这一点吗?我认为JavaRDD可以映射到JavaRDD,但是我不明白该怎么做。

此外,我相信这个createDataFrame() API 使用反射进行处理(必须),因此,还存在性能影响问题。请问有没有办法维护POJO和关系数据库之间的映射,并插入数据?

最佳答案

这样做对我来说很有效。

@Data
class User implements Serializable {
private int id;
private String name;
private int age;
private String address;
private static StructType structType = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("id", DataTypes.IntegerType, false),
        DataTypes.createStructField("name", DataTypes.StringType, false),
        DataTypes.createStructField("age", DataTypes.IntegerType, false),
        DataTypes.createStructField("address", DataTypes.StringType, false)
});

public static StructType getStructType() {
    return structType;
}

public Object[] getAllValues() {
    return new Object[]{id, name, age, address};
}

}

Spark 工作 -

JavaDStream<User> userStream = ... // created this stream with some processing
userStream.map(e -> {
            Row row = RowFactory.create(e.getAllValues());
            return row;
        }).foreachRDD(rdd -> {
            DataFrame df = sqlContext.createDataFrame(rdd,User.getStructType());
            df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
        });

我认为这是比前一种更好的方法,因为在前一种中,dataframe 使用反射将 POJO 映射到它自己的数据结构中。这是一种更干净的方法,因为我已经 Row 是 Spark sql 本身的一种格式,并且我已经在 getAllValues() 中提到了将数据插入数据帧的顺序以及getStructType()

中的列映射

如有错误,请指正。

关于java - 将用户定义的对象转换为数据帧并写入 RDBMS - 如何维护与数据库的映射?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41270711/

相关文章:

java - 为什么 PropertyDescriptor 和 Reflection 在 1.7 中与在 1.6 中对于泛型的工作效果不同?

java - Android:时间和时区问题

scala - 使用 Spark Scala 计算平均值

apache-spark - 最佳文件大小和 Parquet block 大小

apache-spark - Spark - 将 kafka 流式传输到每天都在变化的文件?

java - Tomcat - 从 JSP 访问属性

scala - 如何将分组的 Spark RDD 内容展平为单独的行然后保存到文件

hadoop - 在 Yarn 集群上运行时 Spark 批处理未完成

scala - 从 RDD 访问 KafkaOffset 时出现异常

java - 将原始二进制文件写入 java 文件