java - Spark java代码在spark_core v2.2中运行但在spark_core v2.3中失败

标签 java apache-spark apache-spark-sql apache-spark-dataset

我有一个 Spark Java 代码,在 spark-core_2.11 v2.2.0 中运行良好,但在 spark-core_2.11 v2.3.1 中抛出异常。
该代码基本上是将列“isrecurrence”映射到值1(如果值为true)和0(如果该值为
该列还包含值“null”(作为字符串)。这些“null”字符串将被“\N”替换(因为 hive 会将此数据读取为 NULL)。

代码:

public static Seq<String> convertListToSeq(List<String> inputList)
{
    return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}

String srcCols = "id,whoid,whatid,whocount,whatcount,subject,activitydate,status,priority,ishighpriority,ownerid,description,isdeleted,accountid,isclosed,createddate,createdbyid,lastmodifieddate,lastmodifiedbyid,systemmodstamp,isarchived,calldurationinseconds,calltype,calldisposition,callobject,reminderdatetime,isreminderset,recurrenceactivityid,isrecurrence,recurrencestartdateonly,recurrenceenddateonly,recurrencetimezonesidkey,recurrencetype,recurrenceinterval,recurrencedayofweekmask,recurrencedayofmonth,recurrenceinstance,recurrencemonthofyear,recurrenceregeneratedtype";
String table = "task";
String[] colArr = srcCols.split(",");
List<String> colsList = Arrays.asList(colArr);
Dataset<Row> filtered = spark.read().format("com.springml.spark.salesforce")
                    .option("username", prop.getProperty("salesforce_user"))
                    .option("password", prop.getProperty("salesforce_auth"))
                    .option("login", prop.getProperty("salesforce_login_url"))
                    .option("soql", "SELECT "+srcCols+" from "+table)
                    .option("version", prop.getProperty("salesforce_version"))
                    .load().persist();

String column = "isrecurrence"; //This column has values 'true' or 'false' as string. 
                                //'true' will be mapped to '1' (as string)
                                //'false' will be mapped to '0' (as string).

String newCol = column + "_mapped_to_new_value";

filtered = filtered.selectExpr(convertListToSeq(colsList)) 
                            .withColumn(newCol, //code is breaking here at "withColumn"
                                when(filtered.col(column).notEqual("null"), 
                                    when(filtered.col(column).equalTo("true"), 1)
                                    .otherwise(when(filtered.col(column).equalTo("false"), 0)))
                                .otherwise(lit("\\N"))).alias(newCol)
                            .drop(filtered.col(column));

filtered.write().mode(SaveMode.Overwrite).option("delimiter", "^").csv(hdfsExportLoaction);

错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Project [id#35, whoid#21, whatid#1, whocount#13, whatcount#5, subject#27, activitydate#22, status#19, priority#24, ishighpriority#10, ownerid#15, description#2, isdeleted#20, accountid#3, isclosed#12, createddate#34, createdbyid#16, lastmodifieddate#0, lastmodifiedbyid#37, systemmodstamp#28, isarchived#30, calldurationinseconds#23, calltype#9, calldisposition#6, ... 16 more fields];;
'Project [id#35, whoid#21, whatid#1, whocount#13, whatcount#5, subject#27, activitydate#22, status#19, priority#24, ishighpriority#10, ownerid#15, description#2, isdeleted#20, accountid#3, isclosed#12, createddate#34, createdbyid#16, lastmodifieddate#0, lastmodifiedbyid#37, systemmodstamp#28, isarchived#30, calldurationinseconds#23, calltype#9, calldisposition#6, ... 16 more fields]
+- Project [id#35, whoid#21, whatid#1, whocount#13, whatcount#5, subject#27, activitydate#22, status#19, priority#24, ishighpriority#10, ownerid#15, description#2, isdeleted#20, accountid#3, isclosed#12, createddate#34, createdbyid#16, lastmodifieddate#0, lastmodifiedbyid#37, systemmodstamp#28, isarchived#30, calldurationinseconds#23, calltype#9, calldisposition#6, ... 15 more fields]
   +- Project [id#35, whoid#21, whatid#1, whocount#13, whatcount#5, subject#27, activitydate#22, status#19, priority#24, ishighpriority#10, ownerid#15, description#2, isdeleted#20, accountid#3, isclosed#12, createddate#34, createdbyid#16, lastmodifieddate#0, lastmodifiedbyid#37, systemmodstamp#28, isarchived#30, calldurationinseconds#23, calltype#9, calldisposition#6, ... 15 more fields]
      +- Relation[LastModifiedDate#0,WhatId#1,Description#2,AccountId#3,RecurrenceDayOfWeekMask#4,WhatCount#5,CallDisposition#6,ReminderDateTime#7,RecurrenceEndDateOnly#8,CallType#9,IsHighPriority#10,RecurrenceRegeneratedType#11,IsClosed#12,WhoCount#13,RecurrenceInterval#14,OwnerId#15,CreatedById#16,RecurrenceActivityId#17,IsReminderSet#18,Status#19,IsDeleted#20,WhoId#21,ActivityDate#22,CallDurationInSeconds#23,... 15 more fields] DatasetRelation(null,com.springml.salesforce.wave.impl.ForceAPIImpl@68303c3e,SELECT id,whoid,whatid,whocount,whatcount,subject,activitydate,status,priority,ishighpriority,ownerid,description,isdeleted,accountid,isclosed,createddate,createdbyid,lastmodifieddate,lastmodifiedbyid,systemmodstamp,isarchived,calldurationinseconds,calltype,calldisposition,callobject,reminderdatetime,isreminderset,recurrenceactivityid,isrecurrence,recurrencestartdateonly,recurrenceenddateonly,recurrencetimezonesidkey,recurrencetype,recurrenceinterval,recurrencedayofweekmask,recurrencedayofmonth,recurrenceinstance,recurrencemonthofyear,recurrenceregeneratedtype from task,null,org.apache.spark.sql.SQLContext@2ec23ec3,null,0,1000,None,false,false)

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:356)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:354)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:354)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3301)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1312)
        at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2197)
        at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2164)
        at com.sfdc.SaleforceReader.mapColumns(SaleforceReader.java:187)
        at com.sfdc.SaleforceReader.main(SaleforceReader.java:547)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/07/10 09:38:51 INFO SparkContext: Invoking stop() from shutdown hook
19/07/10 09:38:51 INFO AbstractConnector: Stopped Spark@72456279{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/07/10 09:38:51 INFO SparkUI: Stopped Spark web UI at http://ebdp-po-e007s.sys.comcast.net:4040
19/07/10 09:38:51 INFO YarnClientSchedulerBackend: Interrupting monitor thread
19/07/10 09:38:51 INFO YarnClientSchedulerBackend: Shutting down all executors
19/07/10 09:38:51 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
19/07/10 09:38:51 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
19/07/10 09:38:51 INFO YarnClientSchedulerBackend: Stopped
19/07/10 09:38:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/07/10 09:38:51 INFO MemoryStore: MemoryStore cleared
19/07/10 09:38:51 INFO BlockManager: BlockManager stopped
19/07/10 09:38:51 INFO BlockManagerMaster: BlockManagerMaster stopped

我不确定这是否是由于嵌套 when-otherwise() 导致的。

最佳答案

我使用了 lit() 并且它起作用了:

when(filtered.col(column).equalTo("true"), lit(1))
                                    .otherwise(when(filtered.col(column).equalTo("false"), lit(0)))

而不是

when(filtered.col(column).equalTo("true"), 1)
                                    .otherwise(when(filtered.col(column).equalTo("false"), 0))

关于java - Spark java代码在spark_core v2.2中运行但在spark_core v2.3中失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56971470/

相关文章:

java - 多节点 hadoop 集群中的 Apache Spark Sql 问题

apache-spark - 优化 Spark SQL 中的交叉连接

apache-spark - com.fasterxml.jackson.databind.JsonMappingException : Jackson version is too old 2. 5.3

java - 使用java连接mysql

java - 如何停止创建单例实例

java - Okta/Spring Boot Saml 应用程序进入无限循环

apache-spark - Spark 任务失败时出现重复记录

apache-spark - 谁能解释一下执行程序中的 rdd block

apache-spark-sql - 我们可以使用pyspark中的数据从现有表创建一个新表吗

java - 使用二进制搜索从 TreeSet 返回一个元素