java - MapReduce组合器

标签 java hadoop mapreduce

我有一个带有映射器、缩减器和组合器的简单 mapreduce 代码。 映射器的输出传递给组合器。但是对于reducer,传递的不是combiner的输出,而是mapper的输出。

请帮忙

代码:

package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class AverageSalary
{
public static class Map extends  Mapper<LongWritable, Text, Text, DoubleWritable> 
{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    {    
        String[] empDetails= value.toString().split(",");
        Text unit_key = new Text(empDetails[1]);      
        DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
        context.write(unit_key,salary_value);    

    }  
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text> 
{
    public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
    {
        String val;
        double sum=0;
        int len=0;
        while (values.iterator().hasNext())
        {
            sum+=values.iterator().next().get();
            len++;
        }
        val=String.valueOf(sum)+":"+String.valueOf(len);
        try {
            context.write(key,new Text(val));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
public static class Reduce extends Reducer<Text,Text, Text,Text> 
{
    public void reduce (final Text key, final Text values, final Context context)
    {
        //String[] sumDetails=values.toString().split(":");
        //double average;
        //average=Double.parseDouble(sumDetails[0]);
        try {
            context.write(key,values);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
public static void main(String args[])
{
    Configuration conf = new Configuration();
    try
    {
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    
     if (otherArgs.length != 2) {      
         System.err.println("Usage: Main <in> <out>");      
         System.exit(-1);    }    
     Job job = new Job(conf, "Average salary");    
     //job.setInputFormatClass(KeyValueTextInputFormat.class);    
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    
     job.setJarByClass(AverageSalary.class);    
     job.setMapperClass(Map.class);    
     job.setCombinerClass(Combiner.class);
     job.setReducerClass(Reduce.class);    
     job.setOutputKeyClass(Text.class);    
     job.setOutputValueClass(Text.class);    

        System.exit(job.waitForCompletion(true) ? 0 : -1);
    } catch (ClassNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

最佳答案

组合器的第一条规则是:不要假设组合器会运行。将组合器仅视为优化

Combiner 不能保证运行所有数据。在某些不需要将数据溢出到磁盘的情况下,MapReduce 将完全跳过使用 Combiner。另请注意,组合器可能会在数据子集上运行多次!它会在每次溢出时运行一次。

在您的情况下,您做出了这个错误的假设。您应该在 Combiner 和 Reducer 中进行求和。

此外,您也应该遵循@user987339 的回答。组合器的输入和输出需要相同(Text,Double -> Text,Double)并且需要与 Mapper 的输出和 Reducer 的输入相匹配。

关于java - MapReduce组合器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20212884/

相关文章:

java - 责任链与类列表相比有哪些优点?

hadoop - 我想知道有什么方法我只能在 MapReduce(Hadoop) 中选择每一行的最大值

Java泛型?

在 LWJGL 中加载纹理时发生 Java 致命运行时错误

hadoop - 用 Pig 分割字符串

hadoop - Hadoop开发环境的数据集?

hadoop - 从头开始构建Data Lake

algorithm - MapReduce 排序算法如何工作?

hadoop - 运行 map 缩减作业时不存在job.jar

java - 显示 http ://localhost:9000 can not be reached 的 Sonar 转轮