Hadoop - 解压缩的 zip 文件

标签 hadoop mapreduce compression

我有很多 zip 格式的压缩文件(以 GB 为单位),并且想要编写仅 map 作业来解压缩它们。我的映射器类看起来像

import java.util.zip.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.*;

public class DecompressMapper extends Mapper <LongWritable, Text, LongWritable, Text>
{
    private static final int BUFFER_SIZE = 4096;

    public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Context context) throws IOException
    {
        FileSplit fileSplit = (FileSplit)context.getInputSplit();
        String fileName = fileSplit.getPath().getName();
        this.unzip(fileName, new File(fileName).getParent()  + File.separator +  "/test_poc");  
    }

    public void unzip(String zipFilePath, String destDirectory) throws IOException {
        File destDir = new File(destDirectory);
        if (!destDir.exists()) {
            destDir.mkdir();
        }
        ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath));
        ZipEntry entry = zipIn.getNextEntry();
        // iterates over entries in the zip file
        while (entry != null) {
            String filePath = destDirectory + File.separator + entry.getName();
            if (!entry.isDirectory()) {
                // if the entry is a file, extracts it
                extractFile(zipIn, filePath);
            } else {
                // if the entry is a directory, make the directory
                File dir = new File(filePath);
                dir.mkdir();
            }
            zipIn.closeEntry();
            entry = zipIn.getNextEntry();
        }
        zipIn.close();
    }

    private void extractFile(ZipInputStream zipIn, String filePath) throws IOException {
        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath));
        byte[] bytesIn = new byte[BUFFER_SIZE];
        int read = 0;
        while ((read = zipIn.read(bytesIn)) != -1) {
            bos.write(bytesIn, 0, read);
        }
        bos.close();
    }
}

和我的驱动程序等级

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DecompressJob extends Configured implements Tool{

    public static void main(String[] args) throws Exception
    {
      int res = ToolRunner.run(new Configuration(), new DecompressJob(),args);
      System.exit(res);
    }

    public int run(String[] args) throws Exception
    {
        Job conf = Job.getInstance(getConf());
        conf.setJobName("MapperOnly");

        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(DecompressMapper.class);
        conf.setNumReduceTasks(0);

        Path inp = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.addInputPath(conf, inp);
        FileOutputFormat.setOutputPath(conf, out);

        return conf.waitForCompletion(true) ? 0: 1;
    }
}

看来我的映射器类工作不正常。我没有在所需的目录中获得解压缩的文件。任何帮助表示赞赏。谢谢...

最佳答案

上面的代码有一些问题

  1. 我正在将 MR1 API 与 MR2 API 结合使用。永远不要这样做。
  2. 使用了 Java IO 函数。 Hadoop 无法识别其文件系统中的 Java IO 函数。
  3. 路径未正确生成。

我们在编写 MapReduce 程序时需要小心,因为 hadoop 使用完全不同的文件系统,我们在编写代码时需要考虑这一点,切勿混合使用 MR1 和 MR2 API。

关于Hadoop - 解压缩的 zip 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32714295/

相关文章:

zlib压缩数据时的内存分配?

java - Hadoop Writable和java.io.serialization有什么联系和区别?

java - 使用 hbase 处理图像、视频和音频类型

java - 在 Windows 上用 map reduce 程序创建一个 jar 文件,然后在 linux (hadoop) 上运行它

frameworks - 我可以在 Hadoop 中使用什么来代替 MapReduce,对于小型集群来说,Hadoop 好用吗?

java - Hadoop - 直接从 Mapper 写入 HBase

java - hadoop map reduce线程中整个reducer步骤是否安全?

python - 如何在 pytables 中创建可以存储 Unicode 字符串的压缩数据集?

hadoop - 当嵌套在子查询中时,Hive UDF给出重复的结果而不管参数如何

javascript - 大数数组压缩