我正在尝试使用 Spark thrift 服务器作为 Spark 的一部分来查询 Iceberg 表(包含 S3 中的数据和 Hivemetastore 中的元数据的外部表)。我可以查询非冰山表,但是当我查询冰山表时,出现以下错误。我们可以不通过spark thrift服务器查询iceberg表吗?
版本详情
- Spark - 3.2.1
- Scala - 2.12.15
- 冰山 Spark 库 - iceberg-spark-runtime-3.2_2.12
- 我已经从 Maven 添加了其他 S3 、AWS 依赖项 jar,并添加到了 Spark jars 文件夹中。
我已经使用以下命令启动了 thrift 服务器
start-thriftserver.sh \
--hiveconf hive.metastore.uris=thrift://$ip:$port \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.access.key=$key \
--conf spark.hadoop.fs.s3a.secret.key=$secret \
--conf spark.sql.catalog.iceberg_catalog.uri=thrift://$ip:$port \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.iceberg_catalog.type=hive \
--conf spark.sql.catalog.iceberg_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf iceberg.engine.hive.enabled=true \
查询iceberg表时出现beeline错误select count(*) from $table_name
Error: org.apache.hive.service.cli.HiveSQLException: Error running query: java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:44)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:325)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:191)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:316)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:256)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:300)
... 16 more
Caused by: java.lang.InstantiationException
at java.base/jdk.internal.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135)
... 75 more (state=,code=0)
最佳答案
根据您的配置,您似乎正在尝试使用名为 iceberg_catalog
的目录,但它被配置为 Iceberg 的 SparkSessionCatalog
。
但是,SparkSessionCatalog
是为 Spark 使用的默认目录保留的,这允许该目录与 Iceberg 表和其他格式一起使用。
session 目录必须命名为spark_catalog
。这是 Spark 提出的要求。
因此,您需要将 org.apache.iceberg.SparkCatalog
与您当前配置的名为 iceberg_catalog
的单独目录(名称由您决定)一起使用,或者,如果您想要覆盖默认目录,以便 Iceberg 表和非 Iceberg 表可以存在于一个目录中,则需要将目录名称更改为 spark_catalog
并保留当前配置。
请参阅 adding catalogs 上的文档。在使用的配置中, session 目录 spark_catalog
被覆盖,然后还有一个名为 local
的目录,该目录是不同的,并且只能包含 Iceberg 表。
关于apache-spark - 使用 beeline 客户端通过 Spark thrift 服务器查询 Iceberg 表时出现错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72620351/