您好,我正在尝试在独立模式下使用 map reduce 技术求几个数字的平均值。我有两个输入文件。它包含值 file1:25 25 25 25 25
和 file2:15 15 15 15 15
。
我的程序运行良好,但输出文件包含映射器的输出而不是缩减器的输出。
这是我的代码:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Writable;
import java.io.*;
public class Average {
public static class SumCount implements Writable {
public int sum;
public int count;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(sum);
out.writeInt(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readInt();
count =in.readInt();
}
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, Object>{
private final static IntWritable valueofkey = new IntWritable();
private Text word = new Text();
SumCount sc=new SumCount();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
int sum=0;
int count=0;
int v;
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
v=Integer.parseInt(word.toString());
count=count+1;
sum=sum+v;
}
word.set("average");
sc.sum=sum;
sc.count=count;
context.write(word,sc);
}
}
public static class IntSumReducer extends Reducer<Text,Object,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<SumCount> values,Context context) throws IOException, InterruptedException {
int sum = 0;
int count=0;
int wholesum=0;
int wholecount=0;
for (SumCount val : values) {
wholesum=wholesum+val.sum;
wholecount=wholecount+val.count;
}
int res=wholesum/wholecount;
result.set(res);
context.write(key, result );
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "");
job.setJarByClass(Average.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SumCount.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在我运行程序后,我的输出文件是这样的:
average Average$SumCount@434ba039
average Average$SumCount@434ba039
最佳答案
您不能使用 Reducer 类 IntSumReducer
作为组合器。组合器必须接收和发出相同的键/值类型。
所以我会删除 job.setCombinerClass(IntSumReducer.class);
.
请记住,combine 的输出是 reduce 的输入,所以写出 Text
和 IntWritable
不会工作。
如果您的输出文件看起来像 part-m-xxxxx
那么上述问题可能意味着它只运行了 Map 阶段并停止了。您的柜台会证实这一点。
你还有Reducer<Text,Object,Text,IntWritable>
应该是 Reducer<Text,SumCount,Text,IntWritable>
.
关于java - 输出文件包含 Mapper Output 而不是 Reducer 输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38177609/