amazon-s3 - 未使用 SinkMode.REPLACE 删除级联 S3 Sink Tap

标签 amazon-s3 directory amazon hdfs cascading

我们正在运行 Cascading,其中一个 Sink Tap 被配置为存储在 Amazon S3 中,并且遇到了一些 FileAlreadyExistsException(请参阅 [1])。 这只是偶尔发生(大约 100 次有 1 次)并且不可重现。

深入研究级联编码器,我们发现 Hfs.deleteResource() 被 BaseFlow.deleteSinksIfNotUpdate() 调用(除其他外)。 顺便说一句,我们对沉默的 NPE 很感兴趣(评论是“hack to get around npe thrown when fs reaches root directory”)。

从那里开始,我们使用自己的 Tap 扩展了 Hfs Tap,以在 deleteResource() 方法(参见 [2])中添加更多操作,并使用直接调用 getFileSystem(conf).delete 的重试机制。

重试机制似乎带来了改进,但我们仍然有时会面临失败(见[3]中的示例):听起来HDFS返回isDeleted=true,但直接询问文件夹是否存在后,我们收到exists=true ,这不应该发生。当流成功时,日志还会随机显示 isDeleted true 或 false,这听起来像是返回值无关紧要或不可信。

任何人都可以通过这种行为带来自己的 S3 体验:“文件夹应该被删除,但实际上没有”?我们怀疑是 S3 问题,但它是否也存在于 Cascading 或 HDFS 中?

我们在 Hadoop Cloudera-cdh3u5 和 Cascading 2.0.1-wip-dev 上运行。

[1]

org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3n://... already exists
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
    at com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper.checkOutputSpecs(DeprecatedOutputFormatWrapper.java:75)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:923)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:882)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1278)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:882)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:856)
    at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:104)
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:174)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:137)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:122)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.j

[2]

  @Override
  public boolean deleteResource(JobConf conf) throws IOException {
    LOGGER.info("Deleting resource {}", getIdentifier());

    boolean isDeleted = super.deleteResource(conf);
    LOGGER.info("Hfs Sink Tap isDeleted is {} for {}", isDeleted,
        getIdentifier());

    Path path = new Path(getIdentifier());

    int retryCount = 0;
    int cumulativeSleepTime = 0;
    int sleepTime = 1000;

    while (getFileSystem(conf).exists(path)) {
      LOGGER
          .info(
              "Resource {} still exists, it should not... - I will continue to wait patiently...",
              getIdentifier());
      try {
        LOGGER.info("Now I will sleep " + sleepTime / 1000
            + " seconds while trying to delete {} - attempt: {}",
            getIdentifier(), retryCount + 1);
        Thread.sleep(sleepTime);
        cumulativeSleepTime += sleepTime;
        sleepTime *= 2;
      } catch (InterruptedException e) {
        e.printStackTrace();
        LOGGER
            .error(
                "Interrupted while sleeping trying to delete {} with message {}...",
                getIdentifier(), e.getMessage());
        throw new RuntimeException(e);
      }

      if (retryCount == 0) {
        getFileSystem(conf).delete(getPath(), true);
      }

      retryCount++;

      if (cumulativeSleepTime > MAXIMUM_TIME_TO_WAIT_TO_DELETE_MS) {
        break;
      }
    }

    if (getFileSystem(conf).exists(path)) {
      LOGGER
          .error(
              "We didn't succeed to delete the resource {}. Throwing now a runtime exception.",
              getIdentifier());
      throw new RuntimeException(
          "Although we waited to delete the resource for "
              + getIdentifier()
              + ' '
              + retryCount
              + " iterations, it still exists - This must be an issue in the underlying storage system.");
    }

    return isDeleted;

  }

[3]

INFO [pool-2-thread-15] (BaseFlow.java:1287) - [...] at least one sink is marked for delete
 INFO [pool-2-thread-15] (BaseFlow.java:1287) - [...] sink oldest modified date: Wed Dec 31 23:59:59 UTC 1969
 INFO [pool-2-thread-15] (HiveSinkTap.java:148) - Now I will sleep 1 seconds while trying to delete s3n://... - attempt: 1
 INFO [pool-2-thread-15] (HiveSinkTap.java:130) - Deleting resource s3n://...
 INFO [pool-2-thread-15] (HiveSinkTap.java:133) - Hfs Sink Tap isDeleted is true for s3n://...
 ERROR [pool-2-thread-15] (HiveSinkTap.java:175) - We didn't succeed to delete the resource s3n://... Throwing now a runtime exception.
 WARN [pool-2-thread-15] (Cascade.java:706) - [...] flow failed: ...
 java.lang.RuntimeException: Although we waited to delete the resource for s3n://... 0 iterations, it still exists - This must be an issue in the underlying storage system.
    at com.qubit.hive.tap.HiveSinkTap.deleteResource(HiveSinkTap.java:179)
    at com.qubit.hive.tap.HiveSinkTap.deleteResource(HiveSinkTap.java:40)
    at cascading.flow.BaseFlow.deleteSinksIfNotUpdate(BaseFlow.java:971)
    at cascading.flow.BaseFlow.prepare(BaseFlow.java:733)
    at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:761)
    at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:710)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:619)

最佳答案

首先,仔细检查级联兼容性页面以了解支持的发行版。

http://www.cascading.org/support/compatibility/

请注意,Amazon EMR 会定期运行兼容性测试并报告结果。

其次,S3 是一个最终一致的文件系统。 HDFS 不是。因此,关于 HDFS 行为的假设不会延续到针对 S3 存储数据。例如,重命名实际上是复制和删除。副本可能需要几个小时。亚马逊已经修补了他们的内部发行版以适应许多差异。

第三,S3中没有目录。这是一个 hack,不同的 S3 接口(interface)支持不同(jets3t vs s3cmd vs ...)。考虑到前面的观点,这必然会出现问题。

第四,网络延迟和可靠性至关重要,尤其是在与 S3 通信时。从历史上看,我发现在使用 EMR 与标准 EC2 实例相比,Amazon 网络在 S3 上操作海量数据集时表现更好。我也相信他们是 EMR 中的一个补丁,也可以改善这里的问题。

因此,我建议尝试运行 EMR Apache Hadoop 发行版,看看您的问题是否得到解决。

关于amazon-s3 - 未使用 SinkMode.REPLACE 删除级联 S3 Sink Tap,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13421593/

相关文章:

amazon-web-services - 亚马逊 s3 对您创建的子文件夹数量有限制吗?

amazon-web-services - 如何从存储桶根重定向到 AWS S3 中的子目录?

amazon-web-services - boto3如何使用元数据创建对象?

python - 我如何搜索目录并找到与正则表达式匹配的文件?

python - 在不更改目录的情况下写入 Python 中的新目录

svn - 使 SVN 中的文件夹可写,以便 checkout 存储库时,权限已设置为可写?

mysql - 没有可用的包 mysql-server

map - 如何终止在 Amazon EMR 上运行的 map task ?

amazon-s3 - Amazon S3 下载index.html 而不是提供服务

php - 上传base64图片到amazon s3