sql - 在 awaitResult 中抛出 SPARK 异常

标签 sql join apache-spark

我在本地运行 SPARK(我没有使用 Mesos),当运行 d3=join(d1,d2) 和 d5=(d3, d4) 之类的连接时,出现以下异常“org.apache.spark.SparkException: awaitResult 中抛出异常”。
谷歌搜索,我发现以下两个相关链接:

1)  https://github.com/apache/spark/commit/947b9020b0d621bc97661a0a056297e6889936d3
2)  https://github.com/apache/spark/pull/12433

这两者都解释了它发生的原因,但没有说明如何解决它。

关于我的运行配置的更多信息:

1) 我正在使用 spark-core_2.11、spark-sql_2.11

SparkSession spark = SparkSession
                     .builder()
                     .master("local[6]").appName("DatasetForCaseNew").config("spark.executor.memory", "4g").config("spark.shuffle.blockTransferService", "nio").getOrCreate();

3) 公共(public)数据集 buildDataset(){
...
// STEP A
// Join prdDS with cmpDS          
Dataset<Row> prdDS_Join_cmpDS
                = res1                        
                  .join(res2, (res1.col("PRD_asin#100")).equalTo(res2.col("CMP_asin")), "inner");
        
        prdDS_Join_cmpDS.take(1);        

// STEP B
// Join prdDS with cmpDS
Dataset<Row> prdDS_Join_cmpDS_Join
                = prdDS_Join_cmpDS                        
                  .join(res3, prdDS_Join_cmpDS.col("PRD_asin#100").equalTo(res3.col("ORD_asin")), "inner");
        prdDS_Join_cmpDS_Join.take(1);
        prdDS_Join_cmpDS_Join.show();

}

当计算到达 STEP B 时抛出异常(参见下面的堆栈跟踪),直到 STEP A 正常。

有什么错误或遗漏吗?

提前感谢您的帮助。

最好的祝福,
卡罗
=== STACK TRACE

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 422.102 sec <<< FAILURE!
testBuildDataset(org.mksmart.amaretto.ml.DatasetPerHourVerOneTest)  Time elapsed: 421.994 sec  <<< ERROR!
org.apache.spark.SparkException: Exception thrown in awaitResult: 
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:102)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.consume(SortMergeJoinExec.scala:35)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doProduce(SortMergeJoinExec.scala:565)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.produce(SortMergeJoinExec.scala:35)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2121)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2128)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1862)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1861)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2466)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1861)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2078)
at org.mksmart.amaretto.ml.DatasetPerHourVerOne.buildDataset(DatasetPerHourVerOne.java:115)
at org.mksmart.amaretto.ml.DatasetPerHourVerOneTest.testBuildDataset(DatasetPerHourVerOneTest.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.apache.maven.surefire.junit4.JUnit4TestSet.execute(JUnit4TestSet.java:53)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:123)
at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:104)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:164)
at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:110)
at org.apache.maven.surefire.booter.SurefireStarter.invokeProvider(SurefireStarter.java:175)
at org.apache.maven.surefire.booter.SurefireStarter.runSuitesInProcessWhenForked(SurefireStarter.java:107)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:68)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
... 85 more

最佳答案

只是为了补充卡洛所说的,我使用了以下代码行:

sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1")

关于sql - 在 awaitResult 中抛出 SPARK 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38635930/

相关文章:

mysql - 过滤相同或相似的列值

database - 数据库管理系统 : Relational Algebra Execution Plan Cost Calculation

hadoop - 如何从hdfs读取二进制文件?

sql - 为什么在 postgresql 中表称为关系?

sql - 如何在 Teradata SQL 上根据某些条件保留特定组的第一行?

php - 如何将 SQL 表中的行列出到 jquery 脚本中?

mysql - 使用左连接和排序时如何避免文件排序

MySQL 慢分组依据/排序依据

join - Spark : Join dataframe column with an array

apache-spark - 如何找到特定 Spark 配置属性的值?