java - Mapper 和 Reducer 之间的类型不匹配

标签 java hadoop mapreduce

我有以下输出模式

public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>

  • 输入:文本/文本
  • 输出:文本/枚举(RecordWritable,我自己的类)

public static class JoinSumReducer extends Reducer<Text, RecordWritable, Text, DoubleWritable>

  • 输入:Text/Enum(RecordWritable,我自己的类)
  • 输出:文本/ double

我收到以下运行时异常 java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable (代码后的完整堆栈跟踪)。

我尝试了 Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text 提出的解决方案但这会导致运行时异常:java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable (代码后的完整堆栈跟踪)。

很明显,某处存在类型不匹配,但我遵循了所有值定义,但找不到我遗漏的内容。我需要在其他地方定义正在使用的类型吗?

这是我的代码

可写枚举类

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;

/**
 * Writable for enum Record
 */
public class RecordWritable implements Writable{
    public static enum Record {BUY, CLICK};
    private Record data;

    public void set(Record data) {
        this.data = data;
    }

    public Record get() {
        return this.data;
    }

    public void readFields(DataInput dataInput) throws IOException {
        data = WritableUtils.readEnum(dataInput, Record.class); 
    }

    public void write(DataOutput dataOutput) throws IOException {
        WritableUtils.writeEnum(dataOutput,data);
    }
}

Mapper/Reducer 和 Main

import java.io.IOException;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SuccessRate {

    /**
     * Mapper
     * - Key = ItemID
     * - Value = The type of record is determined by number of columns
     */
    public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>{

        private Text itemID = new Text();
        private RecordWritable record = new RecordWritable();
        Pattern itemIDpattern = Pattern.compile("^(\\d+),");
        Pattern columnPattern = Pattern.compile(",");

        public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
            Scanner itr = new Scanner(value.toString());
            while (itr.hasNextLine()) {
                String line = itr.nextLine();

                String id = null;
                Matcher m = itemIDpattern.matcher(line);
                if(m.find())
                    id = m.group(1);

                RecordWritable.Record fileType;
                int count = StringUtils.countMatches(line, ",");
                if(count==4)
                    fileType = RecordWritable.Record.CLICK;
                else
                    fileType = RecordWritable.Record.BUY;

                if(id != null) {
                    itemID.set(id);
                    record.set(fileType);
                    context.write(itemID, record);
                }
            }
            itr.close();
        }
    }

    /**
     * Reducer
     * - Key : ItemID
     * - Value : sum of buys / sum of clicks
     */
    public static class JoinSumReducer
    extends Reducer<Text, RecordWritable, Text, DoubleWritable> {
        private DoubleWritable result = new DoubleWritable();

        public void reduce(Text key, Iterable<RecordWritable> values,
                Context context
                ) throws IOException, InterruptedException {
            int sumClick = 0;
            int sumBuy = 0;
            for (RecordWritable val : values) {
                switch(val.get()) {
                case CLICK:
                    sumClick += 1;
                    break;
                case BUY:
                    sumBuy += 1;
                    break;
                }
            }
            result.set((double)sumBuy/(double)sumClick);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {       
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "success rate");
        job.setJarByClass(SuccessRate.class);
        job.setMapperClass(RecordMapper.class);
        job.setCombinerClass(JoinSumReducer.class);
        job.setReducerClass(JoinSumReducer.class);
//      job.setMapOutputKeyClass(Text.class); // I tried adding these two lines after reading https://stackoverflow.com/q/16926783/3303546
//      job.setMapOutputValueClass(RecordWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

完整的异常堆栈跟踪

原始错误

java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1093)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:727)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at SuccessRate$RecordMapper.map(SuccessRate.java:54)
    at SuccessRate$RecordMapper.map(SuccessRate.java:26)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我尝试了 Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text 提出的解决方案但这会导致运行时异常:

2018-09-24 11:36:04,423 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@5532c2f8
java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable
    at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
    at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1562)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1879)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
    at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:86)
    at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:66)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1900)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1662)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1505)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:735)
    at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2076)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:809)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

最佳答案

MR中的工作流程是这样的

mapper(inputkey 1,inputvalue1 ,outputkey1,outputvalue1) => combiner(outputkey1,outputvalue1 ,outputkey2,outputvalue2) => reducer (outputkey2,outputvalue2 ,outputkey3,outputvalue3)

每个键和值数据类型必须匹配

你的错误显示 Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable

是因为你有setCombiner job.setCombinerClass(JoinSumReducer.class)

组合器输出与你的 reducer 不匹配
您可以删除此行并重试。

关于java - Mapper 和 Reducer 之间的类型不匹配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52484837/

相关文章:

java - Extjs 和 Spring MVC XMLHttpRequest 无法加载预检响应具有无效的 HTTP 状态代码 403

hadoop - 是否必须能够从一个节点到另一个节点进行 SSH 才能进行 cloudera 集群安装?

hadoop - 如何使用pig从hdfs加载twitter数据?

hadoop - Giraph 作业始终以本地模式运行

java - 如何用 Java 生成 PGP 证书

java - Android - 使用最小 sdk 版本将字节码转换为 Dex 时出错

hadoop - 文件的字数统计常用字

hadoop - 分解 Hive 中的结构数组

java - 循环输入用户输入

networking - Cloudera 配置 - 多 NIC