我有一个用例,我想从一个 Oracle 表中读取数据,其中所有字段都是 varchar 类型,并将其保存到另一个具有相似字段但数据类型正确的 Oracle 表中。这只能在 java 中完成。所以我想从下表中读取数据集:
create table employeeStr (
name varchar2(50),
empid varchar2(50),
age varchar2(50),
salary varchar2(50),
dt_joined varchar2(50));
并写入下表:
create table employeeNorm (
name varchar2(50),
empid number,
age number(3,0),
salary number(10,2),
dt_joined date);
我的java代码如下:
SparkSession sparkSession =
SparkSession.builder().master("local[*]").appName("HandlingOracleDataTypes").getOrCreate();
SQLContext sqlContext = sparkSession.sqlContext();
sqlContext.udf().register("toDate", new UDF1<String, java.sql.Date>() {
@Override
public java.sql.Date call(String dateStr) throws Exception {
Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr);
return new java.sql.Date(date.getTime());
}
}, DataTypes.DateType);
sqlContext.udf().register("toDate2", new UDF1<String, Date>() {
@Override
public Date call(String dateStr) throws Exception {
Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr);
return date;
}
}, DataTypes.DateType);
sqlContext.udf().register("toDate3", new UDF1<String, String>() {
@Override
public String call(String dateStr) throws Exception {
Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr);
return new SimpleDateFormat("dd-MMM-yyyy").format(date);
}
}, DataTypes.StringType);
Properties connectionProperties = new Properties();
connectionProperties.put("user", "<username>");
connectionProperties.put("password", "<password>");
String jdbcUrl = "<jdbcurl>";
Dataset<Row> employeeStrDS = sparkSession.read().jdbc(jdbcUrl, "employeeStr", connectionProperties);
employeeStrDS.show();
employeeStrDS.printSchema();
employeeStrDS.withColumn("empid",employeeStrDS.col("empid").cast(DataTypes.IntegerType));
employeeStrDS.withColumn("age",employeeStrDS.col("age").cast(DataTypes.IntegerType));
employeeStrDS.withColumn("salary",employeeStrDS.col("salary").cast(DataTypes.FloatType));
//employeeStrDS.withColumn("dt_joined",employeeStrDS.col("dt_joined").cast(DataTypes.DateType));
//employeeStrDS.registerTempTable("abc");
//sqlContext.sql("select toDate(dt_joined) from abc").show();
employeeStrDS.withColumn("dt_joined", functions.callUDF("toDate3", employeeStrDS.col("dt_joined")));
//employeeStrDS.printSchema();
employeeStrDS.write().mode(SaveMode.Append).jdbc(jdbcUrl, "employeeNorm", connectionProperties);
如果我从表和代码中删除“dt_joined”列,则此代码可以工作,但是当我将“dt_joined”列放入图片中时,则没有任何效果。尝试了代码中提到的所有 3 个 UDF,但每次都会出现以下异常。请提出解决方案。
Caused by: java.sql.BatchUpdateException: ORA-01861: literal does not match format string
at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:12296)
at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:246)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:597)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLDataException: ORA-01861: literal does not match format string
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1059)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:522)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225)
更新: 实际场景是 Spark 代码从 Impala 读取数据,创建数据帧。 Impala 表的所有列均为字符串。所以基本上问题是将具有模式的数据帧作为所有字符串保存到具有理想数据类型的 Oracle 表中。
最佳答案
我认为在这种情况下您不会从使用 Spark 中受益,因为您需要首先将所有数据从 Oracle DB 获取到 Spark 集群,然后再返回到 Oracle DB。使用 SQL,您可以就地完成所有操作(在 Oracle DB 内)。您需要做的就是执行以下 SQL 语句(在 Oracle DB 端):
insert into employeeNorm
select name, empid, age, salary, to_date(dt_joined, 'yyyy-mm-dd')
from employeeStr;
commit;
您应该将 'yyyy-mm-dd'
替换为相应的日期时间格式 - 有关详细信息,请参阅下文...
注:date/time format in Oracle's to_date()
function与标准 UNIX 格式不兼容。
这是一个最小的映射:
Oracle UNIX
------ ----
YYYY %Y
YY %y
MM %m
DD %d
HH24 %H
MI %M
SS %S
由您决定如何以及在何处执行这些语句 - 在 Java 中、使用 sqlplus
等。
PS 如果 employeeStr
太大而无法在一笔事务中完成所有操作,您应该考虑使用 BULK INSERT in chunks .
关于java - Apache Spark with Java,从 Oracle 中的 Varchar2 转换为日期类型失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46874845/