java - (Hadoop) : reduce method is not getting executed/called while running mapreduce job

标签 java hadoop mapreduce reduce

我在执行我的 mapreduce 作业时遇到问题。作为我的 map reduce 任务的一部分,我正在使用 mapreduce 连接,其中包括多个 map 方法和单个 reducer 方法。

我的两个 map 方法都被执行了,但是我的 reducer 没有被我的驱动程序类执行/调用。

因此,最终输出只有在我的 map 阶段收集的数据。

我是否在 reduce 阶段使用了错误的输入和输出值? map和reduce阶段是否存在输入输出不匹配?

在这方面帮助我。

这是我的代码..

public class CompareInputTest extends Configured implements Tool  {

public static class FirstFileInputMapperTest extends Mapper<LongWritable,Text,Text,Text>{


    private Text word = new Text();
    private String keyData,data,sourceTag = "S1$";

    public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{

        String[] values = value.toString().split(";");
        keyData = values[1];
        data = values[2];

        context.write(new Text(keyData), new Text(data+sourceTag));


    }
}

public static class SecondFileInputMapperTest extends Mapper<LongWritable,Text,Text,Text>{
    private Text word = new Text();
    private String keyData,data,sourceTag = "S2$";
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

        String[] values = value.toString().split(";");
        keyData = values[1];
        data = values[2];


        context.write(new Text(keyData), new Text(data+sourceTag));

    }

              }

public static class CounterReducerTest extends Reducer
 {
    private String status1, status2;

    public void reduce(Text key, Iterable<Text> values, Context context)
       throws IOException, InterruptedException {
        System.out.println("in reducer");

        for(Text value:values)
           {
           String splitVals[] = currValue.split("$");
        System.out.println("in reducer");
       /*
        * identifying the record source that corresponds to a commonkey and
        * parses the values accordingly
       */
      if (splitVals[0].equals("S1")) {
         status1 = splitVals[1] != null ? splitVals[1].trim(): "status1";
      } else if (splitVals[0].equals("S2")) {
          // getting the file2 and using the same to obtain the Message
          status2 = splitVals[2] != null ? splitVals[2].trim(): "status2";
      }
           }

        context.write(key, new Text(status1+"$$$"));
    }






 public static void main(String[] args) throws Exception {


     int res = ToolRunner.run(new Configuration(), new CompareInputTest(),
             args);
System.exit(res);

     }

public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
     Job job = new Job(conf, "count");
     job.setJarByClass(CompareInputTest.class);
     MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,FirstFileInputMapperTest.class);
     MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,SecondFileInputMapperTest.class);
     job.setReducerClass(CounterReducerTest.class);
     //job.setNumReduceTasks(1);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(Text.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);




     FileOutputFormat.setOutputPath(job, new Path(args[2]));



     return (job.waitForCompletion(true) ? 0 : 1);

}

最佳答案

只需检查 reducer 类的原型(prototype)即可。

extends Reducer<KEY, VALUE, KEY,VALUE>

在您的情况下,由于 reducer 作为输入并作为输出文本发出,因此将定义从

更改为
public static class CounterReducerTest extends Reducer

public static class CounterReducerTest extends Reducer<Text,Text,Text,Text>

关于java - (Hadoop) : reduce method is not getting executed/called while running mapreduce job,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26321352/

相关文章:

java - 如何使用java套接字将文件发送到Web浏览器?

hadoop - Hadoop Map减少多节点集群上的程序

hadoop - 在 mapreduce 中处理文件的子集

java - hadoop mapreduce : java. lang.UnsatisfiedLinkError : org. apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z

java - 返回添加到 List<Object> 的 HashMap

Java 垃圾收集到哈希表

java - 400 Bad request 在 Spring Controller 中将嵌套 json 对象映射到 java.util.Map 时出错

Linux 库链接问题 - HADOOP HDFS C API

hadoop - 假设不赞成使用FALCON,则在Hadoop群集之间使用增量复制(使用Spark)

hadoop - Oozie工作不正常