你好,我发现 map reduce 链有点问题。我必须形成这样的链
映射器->reducer->映射器
从我的第一个 mapper 到 reducer 的流程一直很好,这个 reducer 的输出数据不能正确地转到下一个 mapper。这是我尝试过的一个简单的代码示例
这是我的第一个映射器
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
throws IOException {
String maxSalary = value.toString().split(",")[4];
outputCollector.collect(new Text("max salary"),new IntWritable(Integer.parseInt(maxSalary)));
}
这是我的 reducer
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
throws IOException {
int maxSalary = Integer.MIN_VALUE;
while(values.hasNext()){
maxSalary = Math.max(maxSalary, values.next().get());
}
outputCollector.collect(key, new IntWritable(maxSalary));
}
这是我的下一个简单映射器
public void map(Text key, IntWritable value,
OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
throws IOException {
System.out.println(value.toString());
}
这是我运行作业的主类
JobConf jobConf = new JobConf(jobrunner.class);
jobConf.setJobName("Chaining");
FileInputFormat.setInputPaths(jobConf, new Path("hdfs://localhost:9000/employee_data.txt"));
FileOutputFormat.setOutputPath(jobConf,new Path("hdfs://localhost:9000/chain9.txt"));
JobConf conf1 = new JobConf(false);
ChainMapper.addMapper(jobConf,chainmap.class,LongWritable.class,Text.class,Text.class,IntWritable.class,true,conf1);
JobConf conf2 = new JobConf(false);
ChainReducer.setReducer(jobConf, chainreduce.class,Text.class,IntWritable.class,Text.class,IntWritable.class,true,conf2);
JobConf conf3 = new JobConf(false);
ChainMapper.addMapper(jobConf, nextchainmap.class, Text.class,IntWritable.class,Text.class,IntWritable.class,true,conf3);
JobClient.runJob(jobConf);
我将在我的 reducer 中获得最高员工薪水,这必须传递给下一个映射器,在那里它会找到具有最大薪水值的员工记录,我如何在下一个映射器中完成这个?有什么想法吗?
最佳答案
要链接第二个映射器,您需要调用 ChainReducer.addMapper(...)
而不是 ChainMapper.addMapper(...)
。
关于hadoop - map reduce 链接未正确执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18302233/