我有很多 zip 格式的压缩文件(以 GB 为单位),并且想要编写仅 map 作业来解压缩它们。我的映射器类看起来像
import java.util.zip.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.*;
public class DecompressMapper extends Mapper <LongWritable, Text, LongWritable, Text>
{
private static final int BUFFER_SIZE = 4096;
public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Context context) throws IOException
{
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
this.unzip(fileName, new File(fileName).getParent() + File.separator + "/test_poc");
}
public void unzip(String zipFilePath, String destDirectory) throws IOException {
File destDir = new File(destDirectory);
if (!destDir.exists()) {
destDir.mkdir();
}
ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath));
ZipEntry entry = zipIn.getNextEntry();
// iterates over entries in the zip file
while (entry != null) {
String filePath = destDirectory + File.separator + entry.getName();
if (!entry.isDirectory()) {
// if the entry is a file, extracts it
extractFile(zipIn, filePath);
} else {
// if the entry is a directory, make the directory
File dir = new File(filePath);
dir.mkdir();
}
zipIn.closeEntry();
entry = zipIn.getNextEntry();
}
zipIn.close();
}
private void extractFile(ZipInputStream zipIn, String filePath) throws IOException {
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath));
byte[] bytesIn = new byte[BUFFER_SIZE];
int read = 0;
while ((read = zipIn.read(bytesIn)) != -1) {
bos.write(bytesIn, 0, read);
}
bos.close();
}
}
和我的驱动程序等级
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DecompressJob extends Configured implements Tool{
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new DecompressJob(),args);
System.exit(res);
}
public int run(String[] args) throws Exception
{
Job conf = Job.getInstance(getConf());
conf.setJobName("MapperOnly");
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(DecompressMapper.class);
conf.setNumReduceTasks(0);
Path inp = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(conf, inp);
FileOutputFormat.setOutputPath(conf, out);
return conf.waitForCompletion(true) ? 0: 1;
}
}
看来我的映射器类工作不正常。我没有在所需的目录中获得解压缩的文件。任何帮助表示赞赏。谢谢...
最佳答案
上面的代码有一些问题
- 我正在将 MR1 API 与 MR2 API 结合使用。永远不要这样做。
- 使用了 Java IO 函数。 Hadoop 无法识别其文件系统中的 Java IO 函数。
- 路径未正确生成。
我们在编写 MapReduce 程序时需要小心,因为 hadoop 使用完全不同的文件系统,我们在编写代码时需要考虑这一点,切勿混合使用 MR1 和 MR2 API。
关于Hadoop - 解压缩的 zip 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32714295/