我对 MapReduce 不是很了解。我需要实现的是从几个输入文件的分析中输出一行结果。目前,我的结果包含每个输入文件一行。所以如果我有 3 个输入文件,我将有一个包含 3 行的输出文件;每个输入的结果。由于我对结果进行了排序,因此我只需要将第一个结果写入 HDFS 文件。我的代码如下:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordLength {
public static class Map extends Mapper<Object, Text, LongWritable, Text> {
// private final static IntWritable one = new IntWritable(1);
int max = Integer.MIN_VALUE;
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); //cumleni goturur file dan, 1 line i
StringTokenizer tokenizer = new StringTokenizer(line); //cumleni sozlere bolur
while (tokenizer.hasMoreTokens()) {
String s= tokenizer.nextToken();
int val = s.length();
if(val>max) {
max=val;
word.set(s);
}
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
context.write(new LongWritable(max), word);
}
}
public static class IntSumReducer
extends Reducer<LongWritable,Text,Text,LongWritable> {
private IntWritable result = new IntWritable();
int max=-100;
public void reduce(LongWritable key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
context.write(new Text("longest"), key);
//context.write(new Text("longest"),key);
System.err.println(key);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
//job.setCombinerClass(IntSumReducer.class);
job.setNumReduceTasks(1);
job.setReducerClass(IntSumReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
}
它为每个输入找到最长的单词并将其打印出来。但是我需要在所有可能的输入文件中找到最长的长度,并且只打印一行。
所以输出是:
最长的 11
最长的 10
最长的 8
我希望它只包含:
最长的 11
谢谢
最佳答案
更改了我的代码以查找最长的字长。现在它只打印最长的 11。如果您有更好的方法,请随时纠正我的解决方案,因为我渴望学习最佳选择
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class Map extends Mapper<Object, Text, Text, LongWritable> {
// private final static IntWritable one = new IntWritable(1);
int max = Integer.MIN_VALUE;
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); //cumleni goturur file dan, 1 line i
StringTokenizer tokenizer = new StringTokenizer(line); //cumleni sozlere bolur
while (tokenizer.hasMoreTokens()) {
String s= tokenizer.nextToken();
int val = s.length();
if(val>max) {
max=val;
word.set(s);
context.write(word,new LongWritable(val));
}
}
}
}
public static class IntSumReducer
extends Reducer<Text,LongWritable,Text,LongWritable> {
private LongWritable result = new LongWritable();
long max=-100;
public void reduce(Text key, Iterable<LongWritable> values,
Context context
) throws IOException, InterruptedException {
// int sum = -1;
for (LongWritable val : values) {
if(val.get()>max) {
max=val.get();
}
}
result.set(max);
}
public void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text("longest"),result );
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
// job.setCombinerClass(IntSumReducer.class);
job.setNumReduceTasks(1);
job.setReducerClass(IntSumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
关于java - 分析多个输入文件并只输出一个包含一个最终结果的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50414266/