java - 为什么在本地模式下运行 Spark 时,我需要使用 DataFrames API 执行读取以通过 AWS 进行身份验证?

标签 java amazon-web-services apache-spark amazon-s3

此代码有效并通过:

public class Test {
    public static void main(String[] args) throws IOException {
        AWSCredentials h = new AWSCredentials();
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Test")
                .set("fs.s3a.access.key", h.access_key_id)
                .set("fs.s3a.secret.key", h.secret_access_key);
        if (h.session_token != null) {
            conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
            conf.set("fs.s3a.session.token", h.session_token);
        }
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
        long count = spark.read().text("s3a://mybucket/path-to-files/file+9+0000000223.bin").javaRDD().count();
        System.out.println("count from scala spark is: " + count);
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

        JavaRDD<String> maxwellRdd = sc.textFile("s3a://mybucket/path-to-files/*");
        System.out.println("count is: " + maxwellRdd.count());

        sc.stop();
    }
}

此代码失败,并出现以下 AWS 凭证提供程序异常:

public class Test {
    public static void main(String[] args) throws IOException {
        AWSCredentials h = new AWSCredentials();
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Test")
                .set("fs.s3a.access.key", h.access_key_id)
                .set("fs.s3a.secret.key", h.secret_access_key);
        if (h.session_token != null) {
            conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
            conf.set("fs.s3a.session.token", h.session_token);
        }
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
        //long count = spark.read().text("s3a://mybucket/path-to-files/file+9+0000000223.bin").javaRDD().count();
        //System.out.println("count from scala spark is: " + count);
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

        JavaRDD<String> maxwellRdd = sc.textFile("s3a://mybucket/path-to-files/*");
        System.out.println("count is: " + maxwellRdd.count());

        sc.stop();
    }
}

线程“main”中出现异常 java.io.InterruptedIOException:mybucket 上的 doesBucketExist:com.amazonaws.AmazonClientException:BasicAWSCredentialsProvider 没有提供 AWS 凭证 EnvironmentVariableCredentialsProvider SharedInstanceProfileCredentialsProvider:com.amazonaws.AmazonClientException:无法从 Amazon EC2 加载凭证元数据服务

这对我来说似乎很奇怪。我希望 1. JavaSparkContext 和 SparkSession 使用相同的身份验证方法和提供程序。 2. 如果 SparkSession 使用不同的身份验证方法,我很惊讶它显然以某种副作用的方式这样做,为 JavaSparkContext 设置一个连接以供使用。

dependencies {
    compile group: 'org.ini4j', name: 'ini4j', version: '0.5.4'
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
    compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.2.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-aws', version: '2.8.3'
    //compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.313'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

最佳答案

我不相信您的第一个有效 - 更具体地说,如果它有效,那是因为某些东西正在从环境变量或 EC2 IAM 设置中选取您的凭证。

如果您尝试在 Spark conf 中设置 s3a 选项,则需要在每个选项前添加 "spark.hadoop." 前缀。

简单测试:创建spark上下文后,调用sc.hadoopConfiguration并查找那里的选项(如果您想 100% 确定没有任何拼写错误,这些选项都在 org.apache.hadoop.fs.s3a.Constants 中定义。

关于java - 为什么在本地模式下运行 Spark 时,我需要使用 DataFrames API 执行读取以通过 AWS 进行身份验证?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49804068/

相关文章:

java - 如何关闭 OSR 编译

java - 如何确定 JMS 连接是否存在?

java - AspectJ - 使用预编译的方面编译 Java 源

ios - 以编程方式将默认加密 KMS 添加到存储桶

apache-spark - Spark Structured Stream 只从 Kafka 的一个分区获取消息

Java 开关用例

amazon-web-services - AWS Step Functions 中的直通输入到输出

amazon-web-services - 在Amazon S3存储桶中完成上传后获得通知

apache-spark - 为什么从 Redshift 读取 Spark 的速度这么慢?

java - 如何使用 Spark 的 Direct Stream for Kafka 设置消费者组提交的偏移量?