java - 如何为 map reducer 作业在 java 中为 hadoop 输入自定义选择列读取

标签 java hadoop mapreduce

Hadoop 新手,我想了解 Hadoop 如何读取文件输入:我能够使用下面的代码从 2 列(键/值)输入文件运行 Hadoop 作业:
enter image description here

但是如果我有 5 列并且我想要的(键/值)是 A&E(而不是 A&B)我需要准确修改哪个函数呢?

enter image description here

public class InverterCounter extends Configured implements Tool {

    public static class MapClass extends MapReduceBase
        implements Mapper<Text, Text, Text, Text> {

        public void map(Text key, Text value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {

            output.collect(value, key);
        }
    }   
    public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, IntWritable> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, IntWritable> output,
                           Reporter reporter) throws IOException {

            int count = 0;
            while (values.hasNext()) {
                values.next();
                count++;
            }
            output.collect(key, new IntWritable(count));
        }
    }   
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();      
        JobConf job = new JobConf(conf, InverterCounter.class);    
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("InverterCounter");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);   
        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.set("key.value.separator.in.input.line", ",");       
        JobClient.runJob(job);      
        return 0;
    }  
    public static void main(String[] args) throws Exception { 
        int res = ToolRunner.run(new Configuration(), new InverterCounter(), args);       
        System.exit(res);
    }
}

如果有任何建议,我将不胜感激,我正在尝试更改 job.set("key.value.separator.in.input.line", ",");job.setInputFormat (KeyValueTextInputFormat.class); 运气不好仍然无法解决这个问题。

谢谢

最佳答案

KeyValueTextInputFormat 假定键位于每行的开头,因此它不适用于您的 6 列数据集。

相反,您可以使用 TextInputFormat并提取 key 并为自己赋值。我假设该行中的所有值都用逗号分隔(并且数据中没有逗号,这是另一回事)。

使用 TextInputFormat,您会在 value 中收到完整的行,并在 key 中收到该行在文件中的位置。我们不需要这个位置,所以我们会忽略它。通过单个 Text 中的完整行,我们可以将其转换为 String,用逗号分隔,并导出要发出的键和值:

public class InverterCounter extends Configured implements Tool {

    public static class MapClass extends MapReduceBase
        implements Mapper<Text, Text, Text, Text> {

        public void map(Text key, Text value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {

            String[] lineFields = value.toString().split(",");
            Text outputKey = new Text(lineFields[0] + "," + lineFields[4]);
            Text outputValue = new Text(lineFields[1] + "," + lineFields[2] + "," +
                                        lineFields[3] + "," + lineFields[5]);

            output.collect(outputKey, outputValue);
        }
    }   
    public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, IntWritable> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, IntWritable> output,
                           Reporter reporter) throws IOException {

            int count = 0;
            while (values.hasNext()) {
                values.next();
                count++;
            }
            output.collect(key, new IntWritable(count));
        }
    }   
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();      
        JobConf job = new JobConf(conf, InverterCounter.class);    
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("InverterCounter");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);   
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        JobClient.runJob(job);
        return 0;
    }  
    public static void main(String[] args) throws Exception { 
        int res = ToolRunner.run(new Configuration(), new InverterCounter(), args);       
        System.exit(res);
    }
}

我还没有机会测试这个,所以可能会有小错误。您可能想要重命名该类,因为它不再反转任何东西。最后,该值已发送到 reducer 但未被使用,因此您可以轻松发送 NullWritable。相反。

关于java - 如何为 map reducer 作业在 java 中为 hadoop 输入自定义选择列读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29050370/

相关文章:

java - 操作系统 X : set application icon when launching Java/Swing application from shellscript

java - 两个类之间存在多个关联的 Hibernate 问题

java - 解析复杂的 JSON 文件

java - 在 LoadFunc.getNext() 中跳过一条记录

hadoop - 如何在 web 上使用 php 读取/写入文件到 HDFS

python - 如何在没有 map/reduce 的情况下转换大型 Mongodb 集合中的每个文档?

java - 在自定义键的情况下,如何为自定义分区程序设置 numReduceTask

java - 第二MR工作没有在Hadoop中终止

java - 在 play 2.0.3 中,我有 'sets' 个图像,我想在启动服务器时对其进行分类并放入数据库中

hadoop - 在 hadoop Map reduce 中读取带有工作表的 Excel 文件