hadoop - 无法将点/检查点flink状态保存到AWS S3存储桶

标签 hadoop amazon-s3 hdfs apache-flink flink-streaming

我正在尝试检查/保存我在EMR上运行的flink状态到AWS上的s3存储桶。请注意:

  • 实例(主节点和核心节点)具有正确设置的IAM角色,以访问s3存储桶及其中的所有目录/文件(AmazonS3FullAccess策略已附加到该角色,没有任何内容可以覆盖它)。
  • 我可以成功使用从属节点和主节点中的aws s3 cp xxx s3://flink-bc/checkpoints将文件复制到存储桶中
  • 使用hdfs进行保存点/检查点工作
  • 如果我将检查点设置为使用hdfs,然后尝试将savepoint设置为s3,则savepoint操作错误看起来像
  • org.apache.flink.util.FlinkException: Triggering a savepoint for the job 16c162c47f225cddad974056c9494b6d failed.
        at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
        at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
        at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
        at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
    Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint.........
    Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint.
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) 
    

    并且jobmanager记录:
    java.io.IOException: Cannot instantiate file system for URI: s3://flink-bc/savepoints
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
        at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.initializeLocationForSavepoint(AbstractFsCheckpointStorage.java:147)
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpoint(CheckpointCoordinator.java:511)
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:370)
        at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:951)
    

    最佳答案

    在使用最新的Flink版本(1.10.0)和s3将检查点存储在s3存储桶中时,我遇到了类似的问题。
    因此,请找到我提供的详细工作答案here

    关于hadoop - 无法将点/检查点flink状态保存到AWS S3存储桶,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58294470/

    相关文章:

    csv - pig 十进制值不起作用

    hadoop - 确认为特定 Hive 表启用了压缩

    ssl - 是否可以将子域映射到 S3 存储桶中的文件夹?

    amazon-s3 - S3 下载 pdf - REST API

    hadoop - 如何在不在集群上运行任何命令的情况下将文件从外部Hadoop集群复制到Amazon S3

    amazon-web-services - S3 存储桶的速率限制

    apache-spark - 处理 Spark Streaming rdd 并存储到单个 HDFS 文件

    hadoop - HDFS 中的逻辑/虚拟数据存储

    hadoop - HDFS接收器水槽太多小文件

    apache-spark - 适用于业务流程的Apache Spark?