apache-spark - 使用 beeline 客户端通过 Spark thrift 服务器查询 Iceberg 表时出现错误?

标签 apache-spark spark-thriftserver iceberg

我正在尝试使用 Spark thrift 服务器作为 Spark 的一部分来查询 Iceberg 表(包含 S3 中的数据和 Hivemetastore 中的元数据的外部表)。我可以查询非冰山表,但是当我查询冰山表时,出现以下错误。我们可以不通过spark thrift服务器查询iceberg表吗?

版本详情

  • Spark - 3.2.1
  • Scala - 2.12.15
  • 冰山 Spark 库 - iceberg-spark-runtime-3.2_2.12
  • 我已经从 Maven 添加了其他 S3 、AWS 依赖项 jar,并添加到了 Spark jars 文件夹中。

我已经使用以下命令启动了 thrift 服务器

start-thriftserver.sh \
--hiveconf hive.metastore.uris=thrift://$ip:$port \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.access.key=$key \
--conf spark.hadoop.fs.s3a.secret.key=$secret \
--conf spark.sql.catalog.iceberg_catalog.uri=thrift://$ip:$port \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.iceberg_catalog.type=hive \
--conf spark.sql.catalog.iceberg_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf iceberg.engine.hive.enabled=true \

查询iceberg表时出现beeline错误select count(*) from $table_name

Error: org.apache.hive.service.cli.HiveSQLException: Error running query: java.lang.RuntimeException: java.lang.InstantiationException
        at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:44)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:325)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.lang.InstantiationException
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
        at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:191)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:316)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
        at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:256)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:300)
        ... 16 more
Caused by: java.lang.InstantiationException
        at java.base/jdk.internal.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135)
        ... 75 more (state=,code=0)

最佳答案

根据您的配置,您似乎正在尝试使用名为 iceberg_catalog 的目录,但它被配置为 Iceberg 的 SparkSessionCatalog

但是,SparkSessionCatalog 是为 Spark 使用的默认目录保留的,这允许该目录与 Iceberg 表和其他格式一起使用。

session 目录必须命名为spark_catalog。这是 Spark 提出的要求。

因此,您需要将 org.apache.iceberg.SparkCatalog 与您当前配置的名为 iceberg_catalog 的单独目录(名称由您决定)一起使用,或者,如果您想要覆盖默认目录,以便 Iceberg 表和非 Iceberg 表可以存在于一个目录中,则需要将目录名称更改为 spark_catalog 并保留当前配置。

请参阅 adding catalogs 上的文档。在使用的配置中, session 目录 spark_catalog 被覆盖,然后还有一个名为 local 的目录,该目录是不同的,并且只能包含 Iceberg 表。

关于apache-spark - 使用 beeline 客户端通过 Spark thrift 服务器查询 Iceberg 表时出现错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72620351/

相关文章:

apache-spark - 使用s3和glue时无法以iceberg格式保存分区数据

hadoop - Spark 节俭服务器无法启动

elasticsearch - 为并行查询优化 Elasticsearch

apache-spark - 将日志与 Apache Spark 分开

scala - 使用数据帧时出现异常 : java. lang.NoSuchMethodError : scala. reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)

mysql - 未使用配置单元 MySQL Metastore 中的 Spark 构建

apache-spark - 在 Spark 中启动 Thrift 服务器

apache-spark - 无法使用 Spark 通过 Apache Iceberg 将数据写入表中

azure - Spark 3.3(scala)中UDF函数的问题