Java spark 到 hive 表插入到动态分区异常

标签 java apache-spark hadoop hive

我有以下代码,其中我将数据插入到表 txnaggr_rt_fact 中,该表有 2 列分区 txnintervalintervaltype。我在 spark sql 中启用了动态分区。如果分区已经存在则没有问题。

数据正在插入到表中,但如果分区不存在,则会出现异常,但如果分区已经存在,则没有问题。

SparkSession spark = SparkSession.builder().appName("Java Spark Hive Example")
                .config("spark.sql.warehouse.dir", "hdfs://localhost:8020/user/hive/warehouse")
                .config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict")
                .enableHiveSupport().getOrCreate();
        spark.sql("use nadb");
        spark.sql("show tables").show();
        spark.sql("insert into table txnaggr_rt_fact partition(txninterval='2018-09-03',intervaltype='test') values('1','2','3',4)"); //(Line number 113) Exception raises here as partition doesn't exist

这是异常(exception)情况:

Exception in thread "main" org.apache.spark.sql.AnalysisException: 

java.lang.NullPointerException: null;
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
        at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:249)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at com.cw.na.spark.HiveSqlTest.main(HiveSqlTest.java:113)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:3412)
        at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1650)
        at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1579)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.sql.hive.client.Shim_v0_14.loadPartition(HiveShim.scala:836)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply$mcV$sp(HiveClientImpl.scala:741)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255)
        at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply$mcV$sp(HiveExternalCatalog.scala:855)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
        ... 25 more

我正在使用 hive metastore 服务以及使用 hive --service metastore

我在 hive-site.xml 的 spark conf 文件夹中有以下属性

  <property>
    <name>hive.exec.dynamic.partition</name>
    <value>true</value>
    <description>Whether or not to allow dynamic partitions in DML/DDL.</description>
  </property>
  <property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
    <description>
      In strict mode, the user must specify at least one static partition
      in case the user accidentally overwrites all partitions.
      In nonstrict mode all partitions are allowed to be dynamic.
    </description>
  </property>

此属性 hive.exec.dynamic.partition.mode 设置为 strict,实现后我将其更改为 nonstrict。之后我没有重新启动 spark,但我已经停止了 Metastore 并启动了它。我是否也需要重新启动 spark。我的代码中还缺少什么?

以下是 txnaggr_rt_fact 的架构:

channelid               string
chaincodeid             string
chaincodefcn            string
count                   int
txninterval             date
intervaltype            string

# Partition Information
# col_name              data_type               comment

txninterval             date
intervaltype            string

需要帮助。 谢谢

最佳答案

如果您确定原因是没有插入分区,您可以在插入数据之前发出以下查询:

alter table add if not exists partition (txninterval=<value>, intervaltype=<value>)

关于Java spark 到 hive 表插入到动态分区异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52196912/

相关文章:

java - Java 中的 Nats 请求/回复

java - 如何在大量对象中精确定位循环对象引用?

java - 客观化和关系

java - 等同于带有多个字符定界符的 StringTokenizer

apache-spark - 用 pyspark 用以前已知的好值填充 null

hadoop - 使用 bootstrap 替换 EMR 上的默认 jar

apache-spark - 在 EMR 集群上安装应用程序软件后运行引导操作

scala - 在 SPARK REPL 上出现错误(对 A 的引用不明确)并且在 Intellij 和 Scala REPL 中工作正常?

shell - HDFS 中的文件重命名

apache - 我在 hadooplibs 文件夹中找不到 Apache Oozie Hadoop Libs tar.gz 文件