apache-spark - 在Kerberos化Hadoop环境中启用了Spark并启用了高可用性:Spark SQL仅在写入任务后才能读取数据

标签 apache-spark hadoop high-availability namenode

我们长时间使用了Kerberos化的Hadoop环境(带有Spark 2.3.2和Ambari 2.7.4的HDP 3.1.4),到目前为止一切进展顺利。

现在,我们启用了NameNode的高可用性,并遇到以下问题:当我们想使用Spark SQL读取数据时,我们首先必须写入一些(其他)数据。如果我们在读取操作之前不写东西,它将失败。

这是我们的场景:

$ kinit -kt /etc/security/keytabs/user.keytab user
$ spark-shell
  • 运行读取请求->每个 session 的第一个读取请求失败!
  • scala> spark.sql("SELECT * FROM pm.simulation_uci_hydraulic_sensor").show
    Hive Session ID = cbb6b6e2-a048-41e0-8e77-c2b2a7f52dbe
    [Stage 0:>                                                          (0 + 1) / 1]20/04/22 15:04:53 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, had-data6.my-company.de, executor 2): java.io.IOException: DestHost:destPort had-job.my-company.de:8020 , LocalHost:localPort had-data6.my-company.de/192.168.178.123:0. Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
            at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
            at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
            at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
            at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
            at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:831)
            at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:806)
            at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1502)
            at org.apache.hadoop.ipc.Client.call(Client.java:1444)
            at org.apache.hadoop.ipc.Client.call(Client.java:1354)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
            at com.sun.proxy.$Proxy13.getBlockLocations(Unknown Source)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:317)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
            at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
            at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
            at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
            at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
            at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:862)
            at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:851)
            at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:840)
            at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1004)
            at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:320)
            at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:316)
            at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
            at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:328)
            at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
            at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:522)
            at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364)
            at org.apache.orc.OrcFile.createReader(OrcFile.java:251)
            [...]
    
  • 运行写作业->可行!
  • scala> val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
    
    scala> primitiveDS.write.saveAsTable("pm.todelete3")
    20/04/22 15:05:07 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
    
  • 现在,再次阅读相同的内容->它可以工作(对于同一 session )!
  • scala> spark.sql("SELECT * FROM pm.simulation_uci_hydraulic_sensor").show
    +--------+--------+--------------------+------+
    |instance|sensorId|                  ts| value|
    +--------+--------+--------------------+------+
    |      21|     PS6|2020-04-18 17:19:...| 8.799|
    |      21|    EPS1|2020-04-18 17:19:...|2515.6|
    |      21|     PS3|2020-04-18 17:19:...| 2.187|
    +--------+--------+--------------------+------+
    

    运行新的spark-shell session 时,行为相同!

    有人可以解决这个问题吗?谢谢!

    最佳答案

    我们找到了问题的答案:表属性指向在Hadoop集群中激活高可用性之前创建的表中“旧” NameNode位置。

    您可以通过运行以下命令来查找表信息:

    $ spark-shell
    scala> spark.sql("DESCRIBE EXTENDED db.table").show(false)
    

    这显示表信息,例如我的情况:
    +----------------------------+---------------------------------------------------------------------------------------------+-------+
    |col_name                    |data_type                                                                                    |comment|
    +----------------------------+---------------------------------------------------------------------------------------------+-------+
    |instance                    |int                                                                                          |null   |
    |sensorId                    |string                                                                                       |null   |
    |ts                          |timestamp                                                                                    |null   |
    |value                       |double                                                                                       |null   |
    |                            |                                                                                             |       |
    |# Detailed Table Information|                                                                                             |       |
    |Database                    |simulation                                                                                   |       |
    |Table                       |uci_hydraulic_sensor_1                                                                       |       |                                                                                                                          |       |
    |Created By                  |Spark 2.3.2.3.1.4.0-315                                                                      |       |
    |Type                        |EXTERNAL                                                                                     |       |
    |Provider                    |parquet                                                                                      |       |
    |Statistics                  |244762020 bytes                                                                              |       |
    |Location                    |hdfs://had-job.mycompany.de:8020/projects/pm/simulation/uci_hydraulic_sensor_1       <== This is important!
    |Serde Library               |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe                                  |       |
    |InputFormat                 |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat                                |       |
    |OutputFormat                |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat                               |       |
    +----------------------------+---------------------------------------------------------------------------------------------+-------+
    

    要使用HA群集服务名称设置新表位置,请按照以下SQL命令运行:
    $ spark-shell
    scala> spark.sql("ALTER TABLE simulation.uci_hydraulic_sensor_1 SET LOCATION 'hdfs://my-ha-name/projects/pm/simulation/uci_hydraulic_sensor_1'")
    

    在进一步的Spark session 中,表读取工作正常!

    关于apache-spark - 在Kerberos化Hadoop环境中启用了Spark并启用了高可用性:Spark SQL仅在写入任务后才能读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61366100/

    相关文章:

    java - Java 应用程序的零停机部署

    java - CloudBees 服务级别协议(protocol)和功能服务

    linux - HA - 心跳和网络服务器

    apache-spark - Apache Spark:按键将RDD对拆分为多个RDD以保存值

    scala - 带有 Spark 和 Cassandra 的 SBT 应用程序 - 类路径中缺少符号 'type <none>.package.DataFrame'

    apache-spark - 在 Spark RDD 中对相同数量的键进行采样

    scala - Apache Spark : java. lang.NoSuchMethodError .rddToPairRDDFunctions

    hadoop - sqoop中的最后一个值(增量导入)

    hadoop - 安装oozie sharelib

    hadoop - 在Hive中分解结构的多个数组