java - MapReduce:减少写入上下文时无限期停止

标签 java hadoop mapreduce

下面是一个map reduce程序,在map函数中进行过滤,在reduce步骤中进行求和。

map 部分执行良好。但是当 reduce 部分运行时,它会卡在 context.write(key,value) 行。

只有当我尝试在 reduce 函数类型中编写与在 map 函数中编写的不同的输出时,才会发生这种情况

public class Filter3 {

public static class TokenizerMapper extends Mapper<Object, Text, Text, Contestant>{

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            String[] cols = value.toString().split(",");

            try {
                Contestant val = new Contestant(cols[0],cols[1],cols[2]);

                System.out.println();
                System.out.println();
                System.out.print(key+" ::: ");
                System.out.println(val);
                System.out.println();
                System.out.println();

                val.name = val.name.toUpperCase();

                if(val.rating>=9) {
                    context.write(new Text(val.name), val); //write null if it is not required
                }
            } catch(Exception ex) {
                ex.printStackTrace();
            }

        }
    }

    public static class AvgRatingReducer extends Reducer<Text,Contestant,Text,DoubleWritable> {

        private DoubleWritable result = new DoubleWritable(0.0);

        public void reduce(Text key, Iterable<Contestant> values, Context context ) throws IOException, InterruptedException {        

            double sum = 0.0;
            int count = 0;

            for (Contestant val : values) {
                sum += val.rating;
                count++;
            }

            if(count>0) {
                result.set(sum/(double)count);
            }

            System.out.println(result);

            context.write(key, result);

        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "AvgMRJob"); //configuration and job name

        job.setJarByClass(Filter3.class);

        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(AvgRatingReducer.class);
        job.setReducerClass(AvgRatingReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        Path inPath = new Path(args[0]);
        Path outPath = new Path(args[1]);
        outPath.getFileSystem(conf).delete(outPath,true);

        FileInputFormat.addInputPath(job, inPath);
        FileOutputFormat.setOutputPath(job, outPath);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

使用的可写对象在这里:

public class Contestant implements Writable {

    long id;
    String name;
    double rating;

    public Contestant() {}

    public Contestant(long id, String name, double rating) {
        this.id = id;
        this.name = name;
        this.rating = rating;
    }

    public Contestant(String id, String name, String rating) {
        try {
            this.id = Long.parseLong(id.trim());
        } catch(Exception ex) {

        }
        this.name = name;
        try {
            this.rating = Double.parseDouble(rating.trim());
        } catch(Exception ex) {

        }

    }

    @Override
    public void readFields(DataInput inp) throws IOException {

        id = inp.readLong();
        name = WritableUtils.readString(inp);
        rating = inp.readDouble();
    }

    @Override
    public void write(DataOutput out) throws IOException {

        out.writeLong(id);
        WritableUtils.writeString(out, name);
        out.writeDouble(rating);
    }

    @Override
    public String toString() {

        return this.id + "," + this.name + "," + this.rating;
    }
}

将输出写入上下文时,执行会卡在 reduce 函数中。我没有收到任何错误/异常。它只是无限期地挂起。 我不知道是什么问题。我遵循了 MapReduce 的通常程序。

enter image description here

注意: 如果我在 map 和 reduce 中写入相同类型的数据,则相同的程序会起作用。即如果我在 Map 和 Reduce 函数中都写 (key=Text,val=Contestant) 。 - 而不是在 reduce 中使用 DoubleWritable!!

最佳答案

删除组合器:

// job.setCombinerClass(AvgRatingReducer.class);

如果您使用组合器,则需要确保缩减器在组合器类的输出上工作,而不是映射器。

关于java - MapReduce:减少写入上下文时无限期停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33461476/

相关文章:

java - hapi-fhir 将 fhir 对象转换为 json 字符串

java - 线程执行指定的时间。

java - 如何连接 Hibernate 和 Drools

apache-spark - Spark提交失败,返回码为13,例如wordCount

mongodb - 我的 MongoDB 需要 Hadoop 吗?

hadoop - 重复的任务被杀死

java - 为什么 lambda 中使用的局部变量必须是最终的或实际上是最终的?

regex - CASE WHEN - LIKE - Hadoop Hive 中的 REGEXP

hadoop - 推测执行 Hadoop

api - 最容易学习的 API/创建用于在 hadoop 上运行 mapreduce 的 Web 应用程序的方法?