java - java.io.IOException:使用Combiber在MapReduce中溢出失败

标签 java hadoop mapreduce amazon-emr n-gram

我正在使用HADOOP mapReduce。
在没有本地聚合(即合并器类)的情况下运行项目时,其运行不会出现问题。

当我添加组合器类时,我收到以下消息:

java.lang.Exception: java.io.IOException: Spill failed

此外,控制台将这些行指向导致错误的行:
context.write(new DoubleGramKey(decade), occurrences); 

和:
word2.write(out);

以下是 StepOne 类和 DoubleGramKey 类的类:

public class DoubleGramKey implements WritableComparable<DoubleGramKey>{

    private Text word1;
    private Text word2;
    private IntWritable decade;

    public DoubleGramKey(Text word1, Text word2, IntWritable decade) {
        this.word1 = word1;
        this.word2 = word2;
        this.decade = decade;
    }

    public DoubleGramKey(IntWritable decade) {
        this(new Text("*"),new Text("*"),decade);
    }

    public DoubleGramKey() {
        this(new Text("*"),new Text("*"), new IntWritable(-1));
    }

    public void readFields(DataInput in) throws IOException {
        word1.readFields(in);
        word2.readFields(in);
        decade.readFields(in);
    }

    public void write(DataOutput out) throws IOException {
        word1.write(out);
        word2.write(out);
        decade.write(out);
    }

    public int compareTo(DoubleGramKey other) {
        // if this.decade == other.decade, put <decade * *> first
        // if this.w1 == other.w1, put <w1, *> before <w1,w2>
        int ret = this.decade.get() - other.decade.get();
        if (ret == 0) {
            if (this.word1.toString().equals("*") && !other.word1.toString().equals("*"))
                ret = -1;
            else if (other.word1.toString().equals("*") && !this.word1.toString().equals("*"))
                ret = 1;
            else
                ret = this.word1.toString().compareTo(other.word1.toString());
        }
        if (ret == 0) { //same decade and same w1
            if (this.word2.toString().equals("*") && !other.word2.toString().equals("*"))
                ret = -1;
            else if (other.word2.toString().equals("*") &&  !this.word2.toString().equals("*"))
                ret = 1;
            else
                ret = this.word2.toString().compareTo(other.word2.toString());
        }
        return ret;
    }


    public IntWritable getDecade() {
        return this.decade;
    }

    public Text getWord1() {
        return this.word1;
    }

    public Text getWord2() {
        return this.word2;
    }


    @Override
    public String toString() {
        return this.decade + " " + this.word1 + " " + this.word2;
    }
}

public class StepOne {



    private static class MyMapper extends Mapper<LongWritable, Text, DoubleGramKey, IntWritable> {



        public void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException {
            String[] line = val.toString().split("\t");
            String[] words = line[0].split(" ");
            String decade_str = line[1].substring(0, line[1].length() - 1);
            decade_str = new StringBuilder().append(decade_str).append("0").toString(); //  the decade. for example 1990
            IntWritable decade = new IntWritable(Integer.parseInt(decade_str));
            IntWritable occurrences = new IntWritable(Integer.parseInt(line[2]));
            if (words.length == 1) {
                context.write(new DoubleGramKey(new Text(words[0]), new Text("*"), decade), occurrences);
                context.write(new DoubleGramKey(decade), occurrences); //<* * decade, occurs>
            } else if (words.length == 2) {
                context.write(new DoubleGramKey(new Text(words[0]), new Text(words[1]), decade), occurrences);
            }
            // this is how we will cound number of words per decade
            // example *^&decade_couneter 1990 , 20
        }
    }





    public class MyCombiner extends Reducer<DoubleGramKey, IntWritable, DoubleGramKey, IntWritable> {

        @Override
        public void reduce(DoubleGramKey key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));


        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        System.out.println("starting EMR");

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(StepOne.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(DoubleGramKey.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(MyReducer.class);
        job.setCombinerClass(MyCombiner.class);
        job.setNumReduceTasks(1);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setPartitionerClass(StepOne.MyPartitioner.class);

        job.setInputFormatClass(TextInputFormat.class);
        SequenceFileInputFormat.addInputPath(job, new Path("/home/maor/Desktop/DSP202/ass2_202/googlebooks-eng-all-1gram-20120701-z"));
        SequenceFileInputFormat.addInputPath(job, new Path("/home/maor/Desktop/DSP202/ass2_202/googlebooks-eng-all-2gram-20120701-zy"));
        String output = "output";

        //TODO change the output for args[2] in aws run
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(output));
        job.waitForCompletion(true);
    }
}

最佳答案

好吧,溢出的问题是由noMethodException引起的。 ,因此我发现我的组合器类缺少单词“STATIC”,从而解决了该问题。

关于java - java.io.IOException:使用Combiber在MapReduce中溢出失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62122760/

相关文章:

java - 超过 Java 阶乘的 Kattis 时间限制

java - SharedPreferences 值继续返回为零 - 无法检索 'Data' 字符串的值

hadoop - Hive - 列类型名称太长

hadoop - 使用 HBASE 的 Spark 与使用 HDFS 的 Spark

linux - 如何测试namenode内存使用情况?

java - 根本无法获取要操作的对象的数组(或 vector )

json - Hive为Fluentd Apache日志数据创建表语句

javascript - 在 CouchDB 的 MapReduce 中选择属于一个用户的 n 个元素

mongodb - mongodb中的成对交叉点

java - 邮件API免费配额无需计费?