java - 线程 "main"org.apache.spark.sql.AnalysisException 中出现异常 : cannot resolve 'named_struct()' due to data type mismatch:

标签 java scala apache-spark apache-spark-sql

我运行了 Spark 应用程序,其中加入了两个数据集并形成了一个数据集,并使用我转换的编码器 Dataset<Row>进入Dataset<T > 格式。

编码器如下所示:

Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);

Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);

Dataset<RuleParamsBean> validateDataset = ds.map(rulesParamBean -> validateTransaction(rulesParamBean),encoder);
validateDataset.show();

在对数据集进行 map 操作后,我收到如下错误:

Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);

错误日志

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'named_struct()' due to data type mismatch: input to function named_struct requires at least one argument;;

Relation[TXN_DETAIL_ID#0,TXN_HEADER_ID#1,TXN_SOURCE_CD#2,TXN_REC_TYPE_CD#3,TXN_DTTM#4,EXT_TXN_NBR#5,CUST_REF_NBR#6,CIS_DIVISION#7,ACCT_ID#8,TXN_VOL#9,TXN_AMT#10,CURRENCY_CD#11,MANUAL_SW#12,USER_ID#13,HOW_TO_USE_TXN_FLG#14,MESSAGE_CAT_NBR#15,MESSAGE_NBR#16,UDF_CHAR_1#17,UDF_CHAR_2#18,UDF_CHAR_3#19,UDF_CHAR_4#20,UDF_CHAR_5#21,UDF_CHAR_6#22,UDF_CHAR_7#23,... 102 more fields] JDBCRelation(CI_TXN_DETAIL_STG_DUMMY) [numPartitions=1]

Relation[ACCT_ID#377,ACCT_NBR_TYPE_CD#378,ACCT_NBR#379,VERSION#380,PRIM_SW#381] JDBCRelation(CI_ACCT_NBR_DUMMY) [numPartitions=1]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:120)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:120)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:172)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:178)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3300)
at org.apache.spark.sql.Dataset.map(Dataset.scala:2569)
at com.sample.Transformation.main(Transformation.java:100)

最佳答案

对我来说,问题是因为类型不受支持。我使用的是 LocalDate,我猜 Spark 2.X 不支持它。(我认为他们在版本 3 中包含了对它的支持)

我只是将其从 LocalDate 更改为 TimeStamp 并且它起作用了。看看你是否也有这样的情况?您的 POJO 中有哪些类型不受支持?

关于java - 线程 "main"org.apache.spark.sql.AnalysisException 中出现异常 : cannot resolve 'named_struct()' due to data type mismatch:,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51592822/

相关文章:

scala - 使用 var 过滤 RDD 的意外行为

python - 合并月份和年份列以创建日期列

scala - Spark : Caching an RDD/DF for use across multiple programs

java - FB 墙上的错误图片,来自 android 应用程序(android FB sdk)的提要帖子

java - 如何更改 HashMap 内的链接列表?

java - Payara 5.193.1 上未找到适合 mysql 连接池的驱动程序

scala - 通过Spark写入HBase : Task not serializable

java - Restful 服务的数据库架构

scala - Spark : sum over list containing None and Some()?

arrays - Scala:将文件逐行读入列表数组