此代码有效并通过:
public class Test {
public static void main(String[] args) throws IOException {
AWSCredentials h = new AWSCredentials();
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Test")
.set("fs.s3a.access.key", h.access_key_id)
.set("fs.s3a.secret.key", h.secret_access_key);
if (h.session_token != null) {
conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
conf.set("fs.s3a.session.token", h.session_token);
}
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
long count = spark.read().text("s3a://mybucket/path-to-files/file+9+0000000223.bin").javaRDD().count();
System.out.println("count from scala spark is: " + count);
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<String> maxwellRdd = sc.textFile("s3a://mybucket/path-to-files/*");
System.out.println("count is: " + maxwellRdd.count());
sc.stop();
}
}
此代码失败,并出现以下 AWS 凭证提供程序异常:
public class Test {
public static void main(String[] args) throws IOException {
AWSCredentials h = new AWSCredentials();
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Test")
.set("fs.s3a.access.key", h.access_key_id)
.set("fs.s3a.secret.key", h.secret_access_key);
if (h.session_token != null) {
conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
conf.set("fs.s3a.session.token", h.session_token);
}
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//long count = spark.read().text("s3a://mybucket/path-to-files/file+9+0000000223.bin").javaRDD().count();
//System.out.println("count from scala spark is: " + count);
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<String> maxwellRdd = sc.textFile("s3a://mybucket/path-to-files/*");
System.out.println("count is: " + maxwellRdd.count());
sc.stop();
}
}
线程“main”中出现异常 java.io.InterruptedIOException:mybucket 上的 doesBucketExist:com.amazonaws.AmazonClientException:BasicAWSCredentialsProvider 没有提供 AWS 凭证 EnvironmentVariableCredentialsProvider SharedInstanceProfileCredentialsProvider:com.amazonaws.AmazonClientException:无法从 Amazon EC2 加载凭证元数据服务
这对我来说似乎很奇怪。我希望 1. JavaSparkContext 和 SparkSession 使用相同的身份验证方法和提供程序。 2. 如果 SparkSession 使用不同的身份验证方法,我很惊讶它显然以某种副作用的方式这样做,为 JavaSparkContext 设置一个连接以供使用。
dependencies {
compile group: 'org.ini4j', name: 'ini4j', version: '0.5.4'
compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.2.1'
compile group: 'org.apache.hadoop', name: 'hadoop-aws', version: '2.8.3'
//compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.313'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
最佳答案
我不相信您的第一个有效 - 更具体地说,如果它有效,那是因为某些东西正在从环境变量或 EC2 IAM 设置中选取您的凭证。
如果您尝试在 Spark conf 中设置 s3a 选项,则需要在每个选项前添加 "spark.hadoop."
前缀。
简单测试:创建spark上下文后,调用sc.hadoopConfiguration
并查找那里的选项(如果您想 100% 确定没有任何拼写错误,这些选项都在 org.apache.hadoop.fs.s3a.Constants
中定义。
关于java - 为什么在本地模式下运行 Spark 时,我需要使用 DataFrames API 执行读取以通过 AWS 进行身份验证?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49804068/