hadoop - Hadoop 处理记录如何跨 block 边界拆分?

标签 hadoop split mapreduce hdfs

根据Hadoop - 权威指南

The logical records that FileInputFormats define do not usually fit neatly into HDFS blocks. For example, a TextInputFormat’s logical records are lines, which will cross HDFS boundaries more often than not. This has no bearing on the functioning of your program—lines are not missed or broken, for example—but it’s worth knowing about, as it does mean that data-local maps (that is, maps that are running on the same host as their input data) will perform some remote reads. The slight overhead this causes is not normally significant.

假设一条记录行被分成两个 block (b1 和 b2)。处理第一个 block (b1) 的映射器会注意到最后一行没有 EOL 分隔符,并从下一个数据 block (b2) 中获取该行的剩余部分。

处理第二个 block (b2) 的映射器如何确定第一条记录不完整并且应该从 block (b2) 中的第二条记录开始处理?

最佳答案

有趣的问题,我花了一些时间查看代码以了解详细信息,这是我的想法。拆分由客户端通过 InputFormat.getSplits 处理,因此查看 FileInputFormat 会提供以下信息:

  • 对于每个输入文件,获取文件长度、 block 大小并计算分割大小为 max(minSize, min(maxSize, blockSize)) 其中 maxSize 对应mapred.max.split.sizeminSizemapred.min.split.size
  • 根据上面计算的分割大小,将文件分成不同的FileSplit。这里重要的是,每个 FileSplit 都使用与输入文件中的偏移量相对应的 start 参数进行初始化。那时仍然没有对线路的处理。代码的相关部分如下所示:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

之后,如果您查看由 TextInputFormat 定义的 LineRecordReader,那是处理行的地方:

  • 当您初始化 LineRecordReader 时,它会尝试实例化一个 LineReader,这是一个能够通过 FSDataInputStream 读取行的抽象。有2种情况:
  • 如果定义了CompressionCodec,则此编解码器负责处理边界。可能与您的问题无关。
  • 但是,如果没有编解码器,这就是有趣的地方:如果您的 InputSplitstart 不同于 0,那么您将回溯1 个字符,然后跳过您遇到的由\n 或\r\n (Windows) 标识的第一行!回溯很重要,因为如果您的行边界与分割边界相同,这可以确保您不会跳过有效行。相关代码如下:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

因此,由于拆分是在客户端计算的,因此映射器不需要按顺序运行,每个映射器都已经知道是否需要丢弃第一行。

基本上,如果您在同一个文件中有 2 行,每行 100Mb,为了简化,我们假设拆分大小为 64Mb。然后在计算输入拆分时,我们将有以下场景:

  • Split 1 包含此 block 的路径和主机。在开始处初始化 200-200=0Mb,长度 64Mb。
  • Split 2 在开始时初始化 200-200+64=64Mb,长度 64Mb。
  • Split 3 在开始时初始化 200-200+128=128Mb,长度 64Mb。
  • Split 4 在开始时初始化 200-200+192=192Mb,长度 8Mb。
  • 映射器 A 将处理拆分 1,开始是 0,因此不要跳过第一行,并读取超过 64Mb 限制的整行,因此需要远程读取。
  • Mapper B 将处理 split 2,start is != 0 所以跳过 64Mb-1byte 后的第一行,这对应于 100Mb 处第 1 行的末尾,它仍在 split 2 中,我们有 28Mb 的行在 split 2、所以远程读取剩下的72Mb。
  • Mapper C 将处理 split 3,start is != 0 所以跳过 128Mb-1byte 之后的第一行,它对应于 200Mb 处第 2 行的结尾,这是文件的结尾所以不要做任何事情。<
  • 映射器 D 与映射器 C 相同,只是它在 192Mb-1 字节后寻找换行符。

关于hadoop - Hadoop 处理记录如何跨 block 边界拆分?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14291170/

相关文章:

maven - 如何在Karaf中安装Apache Hadoop客户端

javascript - 提取/消除imamacros中的重复文本

iOS - 使用 openGL ES 仅缩放显示的一部分

hadoop - 使用 YARN/Hadoop 调度,我可以只抢占某些队列吗?

java - HIPI API : does it process 1 image per map task?

hadoop - 如果不使用MapReduce中的setup()和closeup()方法怎么办?

database - 在 Hadoop 或分布式计算框架中管理多个集群

hadoop - Spark SQL:NoSuchMethodError:SQLContext.load

python - 按一定条件拆分数据框但保留原始数据框

json - 在hadoop map reduce中读取json对象来处理数据