我正在使用 RunJobFlow
命令启动 Spark EMR 集群。此命令设置 JobFlowRole
到具有政策 AmazonElasticMapReduceforEC2Role
的 IAM 角色和 AmazonRedshiftReadOnlyAccess
.第一个策略包含允许所有 s3 权限的操作。
当 EC2 实例启动时,它们会承担这个 IAM 角色,并通过 STS 生成临时凭证。
我做的第一件事是使用 com.databricks.spark.redshift
从我的 Redshift 集群读取一个表到一个 Spark Dataframe 中。格式并使用相同的 IAM 角色从 redshift 卸载数据,就像我为 EMR 所做的那样 JobFlowRole
.
据我所知,这运行一个 UNLOAD
Redshift 上的命令转储到我指定的 S3 存储桶中。 Spark 然后将新卸载的数据加载到 Dataframe 中。我用推荐的s3n://
tempdir
的协议(protocol)选项。
此命令效果很好,它总是成功地将数据加载到 Dataframe 中。
然后我运行一些转换并尝试将数据框保存在 csv
中格式化到同一个 S3 存储桶 Redshift Unloaded
进入。
但是,当我尝试这样做时,它会抛出以下错误
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively)
好的。所以我不知道为什么会这样,但我试图通过设置推荐的 hadoop 配置参数来破解它。然后我使用了 DefaultAWSCredentialsProviderChain
加载 AWSAccessKeyID
和 AWSSecretKey
并通过
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", <CREDENTIALS_ACCESS_KEY>)
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", <CREDENTIALS_SECRET_ACCESS_KEY>)
当我再次运行它时,它会抛出以下错误:
java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;
好的。所以那没有用。然后,我删除了 hadoop 配置设置,并通过 s3n://ACCESS_KEY:SECRET_KEY@BUCKET/KEY
在 s3 url 中硬编码了 IAM 用户的凭据。
当我运行它时,它吐出以下错误:
java.lang.IllegalArgumentException: Bucket name should be between 3 and 63 characters long
所以它试图创建一个桶..这绝对不是我们想要它做的。
我真的被困在这个问题上,非常感谢这里的任何帮助!当我在本地运行它时它工作正常,但在 EMR 上完全失败。
最佳答案
问题如下:
- EC2 实例在 EMR 引导阶段生成的临时凭证
- 当我查询 Redshift 时,我将
aws_iam_role
传递给了 Databricks 驱动程序。然后,驱动程序会为同一 IAM 角色重新生成临时凭证。这会使 EC2 实例生成的凭据失效。 - 然后我尝试使用旧凭据(以及存储在实例元数据中的凭据)上传到 S3
它失败了,因为它试图使用过时的凭据。
解决方案是通过 aws_iam_role
删除 redshift 授权并将其替换为以下内容:
val credentials = EC2MetadataUtils.getIAMSecurityCredentials
...
.option("temporary_aws_access_key_id", credentials.get(IAM_ROLE).accessKeyId)
.option("temporary_aws_secret_access_key", credentials.get(IAM_ROLE).secretAccessKey)
.option("temporary_aws_session_token", credentials.get(IAM_ROLE).token)
关于scala - EMR Spark 无法将 Dataframe 保存到 S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42351420/