java - 在 org.apache.hadoop.mapred.MapTask$NewOutputCollector 关闭期间忽略异常

标签 java hadoop mapreduce writable

我创建了一个 hadoop 自定义可写对象,如下所示

public class ResultType implements Writable {

    private Text xxxx;
    private Text yyyy;
    private Text zzzz;

    public ResultType() {}

    public ResultType(Text xxxx, Text yyyy, Text zzzz) {
        this.xxxx = xxxx;
        this.yyyy = yyyy;
        this.zzzz = zzzz;   
    }

    public Text getxxxx() {
        return this.xxxx;
    }

    public Text getyyyy() {
        return this.yyyy;
    }

    public Text getzzzz() {
        return this.zzzz;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        xxxx.readFields(in);
        yyyy.readFields(in);
        zzzz.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        xxxx.write(out);
        yyyy.write(out);
        zzzz.write(out);
    }
}

我的映射器代码是

public static class Mapper1 extends TableMapper<Text, ResultType> {

    private Text text = new Text();

    @Override
    public void map(ImmutableBytesWritable row, Result values, Context context)
            throws IOException, InterruptedException {

        // getting name value

        String xxxx = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("xxxx")));
        String yyyy = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("yyyy")));
        String zzzz = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("zzzz")));
        text.set(xxxx);
        context.write(text, new ResultType(new Text(xxxx), new Text(yyyy), new Text(zzzz)));

    }
}

我的 Reducer 代码是

public static class Reducer1 extends Reducer<Text, ResultType, Text, ResultType> {

    public void reduce(Text key, Iterable<ResultType> values, Context context)
            throws IOException, InterruptedException {

        List<ResultType> returnset = new ArrayList<ResultType>();
        Map<String, ResultType> duplicatelist = new HashMap<String, ResultType>();
        boolean iskeyadded = true;

        for (ResultType val : values) {

            Text yyyy = val.getyyyy();
            Text zzzz = val.getzzzz();

            String groupkey = yyyy + "," + zzzz ;

            if (duplicatelist.containsKey(groupkey)) {

                if (iskeyadded) {
                    context.write(key, new ResultType(new Text(key), new Text(yyyy),
                            new Text(zzzz)));
                    iskeyadded = false;
                }

                context.write(key, new ResultType(new Text(key), new Text(yyyy), new Text(zzzz)));

            } else {
                duplicatelist.put(groupkey, val);
            }
        }

    }
}

当我运行这段代码时,我得到了

Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@20890b6f
java.lang.NullPointerException
    at test.ResultType.readFields(ResultType.java)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)
    at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1688)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1637)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1489)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723)
    at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2019)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:797)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

最佳答案

您得到一个 NullPointerException,因为您的自定义可写对象中的任何 Text 对象都不是在任何地方创建的。您可以只在类顶部声明它们的地方创建它们。

private Text xxxx = new Text();
private Text yyyy = new Text();
private Text zzzz = new Text();

我还建议您将设置它们的构造函数更改为:

public ResultType(Text xxxx, Text yyyy, Text zzzz) {
    this.xxxx.set(xxxx);
    this.yyyy.set(yyyy);
    this.zzzz.set(zzzz);   
} 

与字符串不同,Text 对象不是不可变的,因此使它们相等不会创建新的 Text 对象。如果您尝试在别处重用 Text 对象,这将导致问题。

关于java - 在 org.apache.hadoop.mapred.MapTask$NewOutputCollector 关闭期间忽略异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44501999/

相关文章:

hadoop - hbase无法连接到hadoop

hadoop - AWS S3上的HBase HFile损坏

hadoop - 如何在 Hadoop 中创建和读取目录 - Mapreduce Job 工作目录

hadoop - hive 错误:编译语句时出错:失败:ParseException行3:47丢失:位于 'array'附近的 '<EOF>'

java - 在mapreduce作业中设置自定义文件格式类

hadoop - MapReduce作业运行时要查找什么?

java - 变量在此位置只能为空结果集

java - 在java中使用lambda内部增量

java - 使用 Ubuntu/linux 从 IntelliJ 中的断点跳转到调用方方法

java - Android BroadcastReceiver 不会注册