scala - EMR Spark 无法将 Dataframe 保存到 S3

标签 scala amazon-web-services hadoop apache-spark amazon-s3

我正在使用 RunJobFlow命令启动 Spark EMR 集群。此命令设置 JobFlowRole到具有政策 AmazonElasticMapReduceforEC2Role 的 IAM 角色和 AmazonRedshiftReadOnlyAccess .第一个策略包含允许所有 s3 权限的操作。

当 EC2 实例启动时,它们会承担这个 IAM 角色,并通过 STS 生成临时凭证。

我做的第一件事是使用 com.databricks.spark.redshift 从我的 Redshift 集群读取一个表到一个 Spark Dataframe 中。格式并使用相同的 IAM 角色从 redshift 卸载数据,就像我为 EMR 所做的那样 JobFlowRole .

据我所知,这运行一个 UNLOAD Redshift 上的命令转储到我指定的 S3 存储桶中。 Spark 然后将新卸载的数据加载到 Dataframe 中。我用推荐的s3n:// tempdir 的协议(protocol)选项。

此命令效果很好,它总是成功地将数据加载到 Dataframe 中。

然后我运行一些转换并尝试将数据框保存在 csv 中格式化到同一个 S3 存储桶 Redshift Unloaded进入。

但是,当我尝试这样做时,它会抛出以下错误

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively)

好的。所以我不知道为什么会这样,但我试图通过设置推荐的 hadoop 配置参数来破解它。然后我使用了 DefaultAWSCredentialsProviderChain加载 AWSAccessKeyIDAWSSecretKey并通过

设置

spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", <CREDENTIALS_ACCESS_KEY>) spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", <CREDENTIALS_SECRET_ACCESS_KEY>)

当我再次运行它时,它会抛出以下错误:

java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;

好的。所以那没有用。然后,我删除了 hadoop 配置设置,并通过 s3n://ACCESS_KEY:SECRET_KEY@BUCKET/KEY 在 s3 url 中硬编码了 IAM 用户的凭据。

当我运行它时,它吐出以下错误:

java.lang.IllegalArgumentException: Bucket name should be between 3 and 63 characters long

所以它试图创建一个桶..这绝对不是我们想要它做的。

我真的被困在这个问题上,非常感谢这里的任何帮助!当我在本地运行它时它工作正常,但在 EMR 上完全失败。

最佳答案

问题如下:

  • EC2 实例在 EMR 引导阶段生成的临时凭证
  • 当我查询 Redshift 时,我将 aws_iam_role 传递给了 Databricks 驱动程序。然后,驱动程序会为同一 IAM 角色重新生成临时凭证。这会使 EC2 实例生成的凭据失效。
  • 然后我尝试使用旧凭据(以及存储在实例元数据中的凭据)上传到 S3

它失败了,因为它试图使用过时的凭据。

解决方案是通过 aws_iam_role 删除 redshift 授权并将其替换为以下内容:

val credentials = EC2MetadataUtils.getIAMSecurityCredentials ... .option("temporary_aws_access_key_id", credentials.get(IAM_ROLE).accessKeyId) .option("temporary_aws_secret_access_key", credentials.get(IAM_ROLE).secretAccessKey) .option("temporary_aws_session_token", credentials.get(IAM_ROLE).token)

关于scala - EMR Spark 无法将 Dataframe 保存到 S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42351420/

相关文章:

file - 在 Scala 中使用 import scala.reflect.io.File 还是 java.io.File 更好?

验证:未找到隐式 scalaz.Bind

java - scala 中的 protected 方法如何在 jvm 上工作

scala - 如何避免使用 null?

amazon-web-services - 获取可用 AWS 产品列表的编程方式?

amazon-web-services - 如何指示 AWS ELB 将返回 403 代码的健康检查视为成功?

amazon-web-services - 不应用 AWS 堆栈集实例的覆盖参数

hadoop - 处理 PIG Latin 中的重复记录

hadoop - hbase模式设计-模板-继承

java - HCatRecord中的错误