java - 分析多个输入文件并只输出一个包含一个最终结果的文件

标签 java hadoop mapreduce

我对 MapReduce 不是很了解。我需要实现的是从几个输入文件的分析中输出一行结果。目前,我的结果包含每个输入文件一行。所以如果我有 3 个输入文件,我将有一个包含 3 行的输出文件;每个输入的结果。由于我对结果进行了排序,因此我只需要将第一个结果写入 HDFS 文件。我的代码如下:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordLength {


    public static class Map extends Mapper<Object, Text, LongWritable, Text> {
       // private final static IntWritable one = new IntWritable(1);
        int max = Integer.MIN_VALUE;
         private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); //cumleni goturur file dan, 1 line i
            StringTokenizer tokenizer = new StringTokenizer(line); //cumleni sozlere bolur 
            while (tokenizer.hasMoreTokens()) {
                String s= tokenizer.nextToken();
                int val = s.length();
                if(val>max) {
                    max=val;
                    word.set(s);


                }
          }

        }

        public void cleanup(Context context) throws IOException, InterruptedException {    
            context.write(new LongWritable(max), word);    
        }
    }

  public static class IntSumReducer
       extends Reducer<LongWritable,Text,Text,LongWritable> {
    private IntWritable result = new IntWritable();
    int max=-100;
    public void reduce(LongWritable key, Iterable<Text> values,
                       Context context
                       ) throws IOException, InterruptedException {



             context.write(new Text("longest"), key);


        //context.write(new Text("longest"),key);
      System.err.println(key);

    }
  }



  public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Map.class);
        job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
       //job.setCombinerClass(IntSumReducer.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }

  }
}

它为每个输入找到最长的单词并将其打印出来。但是我需要在所有可能的输入文件中找到最长的长度,并且只打印一行。

所以输出是:

最长的 11

最长的 10

最长的 8

我希望它只包含:

最长的 11

谢谢

最佳答案

更改了我的代码以查找最长的字长。现在它只打印最长的 11。如果您有更好的方法,请随时纠正我的解决方案,因为我渴望学习最佳选择

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {


    public static class Map extends Mapper<Object, Text, Text, LongWritable> {
       // private final static IntWritable one = new IntWritable(1);
        int max = Integer.MIN_VALUE;
         private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); //cumleni goturur file dan, 1 line i
            StringTokenizer tokenizer = new StringTokenizer(line); //cumleni sozlere bolur 
            while (tokenizer.hasMoreTokens()) {
                String s= tokenizer.nextToken();
                int val = s.length();
                    if(val>max) {
                        max=val;
                        word.set(s);

                    context.write(word,new LongWritable(val)); 

          }

        }
        }

    }


  public static class IntSumReducer
       extends Reducer<Text,LongWritable,Text,LongWritable> {
    private LongWritable result = new LongWritable();
    long max=-100;
    public void reduce(Text key, Iterable<LongWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {

     // int sum = -1;

        for (LongWritable val : values) {
           if(val.get()>max) {
               max=val.get();

           }
          }
        result.set(max);

    }

    public void cleanup(Context context) throws IOException, InterruptedException {    
        context.write(new Text("longest"),result );   
    }
  }



  public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Map.class);
        job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
      // job.setCombinerClass(IntSumReducer.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }


}

关于java - 分析多个输入文件并只输出一个包含一个最终结果的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50414266/

相关文章:

hadoop - 使用 Hadoop 流处理 gzip 文件

hadoop - Oozie/Hadoop:当输入数据集比静态文件更复杂时,如何定义它?

java - JPA 仅使用其 ID 保存 "new"实体并引用现有实体?

java - 无法运行HBase的示例-权威指南

java - 使用 DBCollection.mapReduce 更改 MongoDB 架构 : spurious nested "value" attribute

hadoop - Elasticsearch 和 Hive 协同工作

java - Hadoop MapReduce 作业创建太大的中间文件

java - 如何从 BufferedImage 的 java 中的 R、G、B 值获取 rgb 像素值

java - 在 Spinner Android 上显示具有不同返回值的文本

java - 如何将数组列表的数据从一个类迁移到另一个类