java - 如何使用Hadoop作业检查大gzip文件(.gz)的完整性?

标签 java validation hadoop gzip integrity

我每天都会收到很多来自他人的gzip文件(* .gz),在将它们放入HDFS并进行分析之前,如果我使用,则需要检查所有文件的完整性(损坏的文件将被删除)gzip -t file_name 来检查本地计算机,它可以工作,但是整个过程太慢了,因为文件数量很大,而且大多数文件都足够大,使得本地验证成为一项耗时的工作。

因此,我转而使用Hadoop作业进行并行验证,每个文件都将在映射器中进行验证,并且损坏的文件路径将输出到文件,这是我的代码:

Hadoop作业设置中的:

Job job = new Job(getConf());
job.setJarByClass(HdfsFileValidateJob.class);
job.setMapperClass(HdfsFileValidateMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(JustBytesInputFormat.class);

映射器中的:
public class HdfsFileValidateMapper extends Mapper<JustBytesWritable, NullWritable, Text, NullWritable> {
  private static final Logger LOG = LoggerFactory.getLogger(HdfsFileValidateJob.class);

  private ByteArrayOutputStream bos;

  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
    /* specify a split size(=HDFS block size here) for the ByteArrayOutputStream, which prevents frequently allocating
     * memory for it when writing data in [map] method */
    InputSplit inputSplit = context.getInputSplit();
    bos = new ByteArrayOutputStream((int) ((FileSplit) inputSplit).getLength());
  }

  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
    InputSplit inputSplit = context.getInputSplit();
    String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath();   // e.g. "/user/hadoop/abc.txt"

    bos.flush();
    byte[] mergedArray = bos.toByteArray();   // the byte array which stores the data of the whole file
    if (!testUnGZip(mergedArray)) {   // broken file
      context.write(new Text(filePath), NullWritable.get());
    }
    bos.close();
  }

  @Override
  public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
    bos.write(key.getBytes());
  }

  /**
   * Test whether we can un-gzip a piece of data.
   *
   * @param data The data to be un-gzipped.
   * @return true for successfully un-gzipped the data, false otherwise.
   */
  private static boolean testUnGZip(byte[] data) {
    int numBytes2Read = 0;
    ByteArrayInputStream bis = null;
    GZIPInputStream gzip = null;
    try {
      bis = new ByteArrayInputStream(data);
      gzip = new GZIPInputStream(bis);
      byte[] buf = new byte[1024];
      int num;
      while ((num = gzip.read(buf, 0, buf.length)) != -1) {
        numBytes2Read += num;
        if (numBytes2Read % (1024 * 1024) == 0) {
          LOG.info(String.format("Number of bytes read: %d", numBytes2Read));
        }
      }
    } catch (Exception e) {
      return false;
    } finally {
      if (gzip != null) {
        try {
          gzip.close();
        } catch (IOException e) {
          LOG.error("Error while closing GZIPInputStream");
        }
      }
      if (bis != null) {
        try {
          bis.close();
        } catch (IOException e) {
          LOG.error("Error while closing ByteArrayInputStream");
        }
      }
    }
    return true;
  }
}

我在其中使用两个名为JustBytesInputFormat和JustBytesWritable的类,可以在这里找到:
https://issues.apache.org/jira/secure/attachment/12570327/justbytes.jar

通常,此解决方案可以正常工作,但是当单个gzip文件足够大(例如1.5G)时,由于Java堆空间问题,Hadoop作业将失败,原因很明显:对于每个文件,我首先收集所有数据放入内存缓冲区,最后进行一次验证,因此文件大小不能太大。

因此,我将部分代码修改为:
  private boolean testUnGzipFail = false;

  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
    InputSplit inputSplit = context.getInputSplit();
    String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath();   // e.g. "/user/hadoop/abc.txt"

    if (testUnGzipFail) {   // broken file
      context.write(new Text(filePath), NullWritable.get());
    }
  }

  @Override
  public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
    if (!testUnGZip(key.getBytes())) {
      testUnGzipFail = true;
    }
  }

这个版本解决了Hadoop作业失败的问题,但是根本无法正常工作!在我的E2E测试中,一个完全好的gzip文件(大小:1.5G)将被视为损坏的文件!

所以这是我的问题:
如何正确进行验证,并避免将单个文件的所有内容读入内存的问题?

任何想法将不胜感激,在此先感谢。

最佳答案

我的第一个解决方案是简单地并行调用gzip -tgzip可能比Java快,并且当文件很大时,创建进程的额外开销应该可以忽略不计。

您的解决方案非常慢。首先,当每个文件只需要几个KB的数据时,您就将许多GB的数据加载到RAM中。代替JustBytesInputFormat,您应该流式传输数据。尝试找到一种方法来将InputStream而不是整个文件内容传递给testUnGZip()

如果该文件作为真实文件存在于硬盘上,请尝试使用NIO API进行读取;这样可以将文件映射到内存中,从而使读取速度更快。

关于java - 如何使用Hadoop作业检查大gzip文件(.gz)的完整性?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27288752/

相关文章:

java - 将输入流重定向到套接字的输出流

javascript - 动态社交媒体按钮 - 减少 http 请求和验证错误

asp.net - 客户端验证失败后如何保持滚动位置?

hadoop - 我如何在 hadoop 中处理大量小文件?

java - 解析CSV时出现Hadoop MapReduce错误

apache - 使用 Kubernetes 或 Apache mesos

java - 如何使用套接字流在 BufferedReader 上设置超时

java foreach 跳过第一次迭代

java - 同步读取变量

java - Hibernate Validator - 根据生命周期进行可选验证