java - Reducer 获得的记录少于预期

标签 java hadoop mapreduce hdfs

我们有一个为文件中的每一行生成唯一键的场景。我们有一个时间戳列,但在少数情况下,同一时间戳有多行可用。

我们决定将唯一值作为时间戳附加它们各自的计数,如下面的程序中所述。

Mapper 只会发出时间戳作为键,将整行作为其值,并在 reducer 中生成键。

问题是 Map 输出大约 236 行,其中只有 230 条记录作为 reducer 的输入,输出相同的 230 条记录。

public class UniqueKeyGenerator extends Configured implements Tool {

    private static final String SEPERATOR = "\t";
    private static final int TIME_INDEX = 10;
    private static final String COUNT_FORMAT_DIGITS = "%010d";

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text row, Context context)
                throws IOException, InterruptedException {
            String input = row.toString();
            String[] vals = input.split(SEPERATOR);
            if (vals != null && vals.length >= TIME_INDEX) {
                context.write(new Text(vals[TIME_INDEX - 1]), row);
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {

        @Override
        protected void reduce(Text eventTimeKey,
                Iterable<Text> timeGroupedRows, Context context)
                throws IOException, InterruptedException {
            int cnt = 1;
            final String eventTime = eventTimeKey.toString();
            for (Text val : timeGroupedRows) {
                final String res = SEPERATOR.concat(getDate(
                        Long.valueOf(eventTime)).concat(
                        String.format(COUNT_FORMAT_DIGITS, cnt)));
                val.append(res.getBytes(), 0, res.length());
                cnt++;
                context.write(NullWritable.get(), val);
            }
        }
    }

    public static String getDate(long time) {
        SimpleDateFormat utcSdf = new SimpleDateFormat("yyyyMMddhhmmss");
        utcSdf.setTimeZone(TimeZone.getTimeZone("America/Los_Angeles"));
        return utcSdf.format(new Date(time));
    }

    public int run(String[] args) throws Exception {
        conf(args);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        conf(args);
    }

    private static void conf(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "uniquekeygen");
        job.setJarByClass(UniqueKeyGenerator.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // job.setNumReduceTasks(400);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

}

对于较高的行数,它是一致的,并且对于 20855982 行的输入,差异与 208969 条记录一样大。减少 reducer 输入的原因可能是什么?

最佳答案

数据丢失背后的原因是其中一个 block 发生了运行时异常,因此该 block 中可用的数据被完全忽略,导致减少器输入减少。

谢谢,
萨蒂什。

关于java - Reducer 获得的记录少于预期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17368418/

相关文章:

hadoop - 我可以为只有一个输出文件的配置单元查询的输出选择我自己的名称吗?

hadoop - MapReduce 和 Hive 应用程序设计

java - 自动隐藏 JMenuBar

java - JPA查询以检查特定年月是否存在记录?

java - Java中SortedList的插入和删除方法是如何工作的

hadoop - DistributedCache 是否会在每次作业后删除缓存的文件?

hadoop - 配置单元中的 IndexOutOfBoundsException 索引

java - 为什么我们可以实例化 Pair<T> 而不能实例化 Pair<?>

hadoop - 风筝数据集 map-reduce

hadoop - 在 vectorized.execution.enabled 时将 ORC 文件格式与 Hive 一起使用时发生 ClassCastException