java - 获取异常 : IOException input buffer is closed exception while extracting a tar file

标签 java hadoop apache-spark hdfs

我在 HDFS 上有一些 tar 文件。我的目标是提取这些文件并将提取的文件存储在 HDFS 上。

例如:

这是我的输入目录结构(HDFS)。

Path : /data/160823 -->
 --------
| 160823 |
 --------
  |
  | --- 00
        |----- xyz.tar
        |----- xyz2.tar

  | --- 01
        |----- xyz3.tar
        |----- abc2.tar

  | --- 02
        |----- abc3.tar
        |----- abc4.tar

   .
   .
   .
   --- 23
        |----- pqr.tar
        |----- pqr2.tar

预期输出将是:

 --------
| Output |
 --------
  |
  |----- xyz.gz
  |----- xyz2.gz

我的代码提取这些 tar 文件并将这些文件存储到 HDFS 上的路径。

所以我能够提取第一个 .tar 文件并且也能够将输出存储在 HDFS 上,但是之后在读取下一个 .tar 文件时,我遇到了这个异常。

java.io.IOException: input buffer is closed
    at org.apache.commons.compress.archivers.tar.TarBuffer.readRecord(TarBuffer.java:190)
    at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getRecord(TarArchiveInputStream.java:302)
    at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:230)
    at com.lsr.TarMapper.call(TarMapper.java:53)
    at com.lsr.TarMapper.call(TarMapper.java:1)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

这是我的代码片段,

import java.util.ArrayList;
import java.util.List;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import com.utils.FileWrapper;

public class TarMapper implements FlatMapFunction<String, String>{

    public Iterable<String> call(String arg0) throws Exception {
        System.out.println("Arg0 : "+arg0);
        List<String> untarFile = new ArrayList<String>();
        FileSystem fileSystem = LTar.fs;
        FSDataInputStream fsin = null;
        TarArchiveInputStream tarin = null;
        OutputStream outstr = null;
        TarArchiveEntry tarentry = null;
        FSDataOutputStream fsDataOutputStream = null;
        Path outputPath = null;
        try{
            fileSystem = FileSystem.get(LTar.conf);
            fsin = fileSystem.open(new Path(arg0));
            tarin = new TarArchiveInputStream(fsin);
            tarentry = tarin.getNextTarEntry();
            while (tarentry != null) {
                if (!tarentry.isDirectory()) {
                    System.out.println("TAR ENTRY : "+tarentry);
                    outputPath = new Path("/data/tar/"+tarentry.getName().substring(2));
                    fsDataOutputStream = fileSystem.create(outputPath);
                    System.out.println("Name : "+tarentry.getName()+"Other : ");
                    IOUtils.copyBytes(tarin, fsDataOutputStream, LTar.conf);
                }
                tarentry = tarin.getNextTarEntry();
            }
        }catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (tarin != null) {
                tarin.close();
            }
            if (fsin != null) {
                fsin.close();
            }
            if (fileSystem != null) {
                fileSystem.close();
            }
            if(outstr !=null){
                 outstr.close();
            }
            if(fsDataOutputStream != null){
                fsDataOutputStream.close();
            }
        }
        return untarFile;
    }
}

请就此问题提出您的建议。

最佳答案

您正在调用的 copyBytes() 重载会在复制结束时关闭输入流。

使用另一个。

关于java - 获取异常 : IOException input buffer is closed exception while extracting a tar file,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39098380/

相关文章:

java - 我为我的 java 库添加了 CLI 支持,我怎样才能方便地将它公开给我的库用户?

java - Java中检查匹配文件的有效方法

hadoop - mapred-site.xml中$ PWD的含义

dataframe - 如何从键值图中提取值?

java - Spark : Partitioning an RDD created from HBase data

java - Spring MVC + Spring Security - 使用 JSON 的 Controller 身份验证

java - If else 冗余代码简化为更简单的形式

hadoop - 无法通过Map/Reduce完成的任务

hadoop - 在大型XML输入文件的情况下如何处理Hadoop拆分

apache-spark - 如何将DataFrame的Spark sql表达式中的空值写入数据库表? (非法参数异常 : Can't get JDBC type for null)