java - 在单个作业中将自定义对象从Map传递到Reduce

标签 java hadoop mapreduce

我创建了一个名为DataObject的自定义类。我想在Map函数中的该对象中填充值,然后将该对象发送给reduce函数。以下是我的代码。但是,我得到以下错误。

java.lang.Exception: java.lang.ClassCastException: class DataObject
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class DataObject
        at java.lang.Class.asSubclass(Class.java:3208)
        at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:422)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/10/07 17:47:57 INFO mapred.JobClient:  map 0% reduce 0%
15/10/07 17:47:57 INFO mapred.JobClient: Job complete: job_local582994215_0001
15/10/07 17:47:57 INFO mapred.JobClient: Counters: 0
15/10/07 17:47:57 INFO mapred.JobClient: Job Failed: NA
Exception in thread "main" java.io.IOException: Job failed!
        at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1357)
        at WordCount.main(WordCount.java:103)

Following is my program-



import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, DataObject, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(LongWritable arg0, Text value, OutputCollector<DataObject, IntWritable> output,
                Reporter reporter) throws IOException {
            // TODO Auto-generated method stub

            FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
            String fileName = fileSplit.getPath().getName();

            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {

                DataObject dObject = new DataObject();
                dObject.setFileName(fileName);
                dObject.setWord(value);
                word.set(tokenizer.nextToken());
                output.collect(dObject, one);
            }

        }

    }

    public static class Reduce extends MapReduceBase implements Reducer<DataObject, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(DataObject dObject, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
                Reporter reporter) throws IOException {
            // TODO Auto-generated method stub
            int sum = 0;
            String[] inputValue = new String[] { "if", "the" };

            if (Arrays.asList(inputValue).contains(dObject.getWord().toString())) {

                while (values.hasNext()) {
                    sum += values.next().get();
                }
                output.collect(dObject.getWord(), new IntWritable(sum));
            }

        }

    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");

        conf.setOutputKeyClass(DataObject.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

最佳答案

如果您的映射器与化简器相比发出不同的键和值,那么我们必须在作业配置中分别指定两者。

// Reducer的配置

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

//映射器的配置
conf.setMapOutputKeyClass(DataObject.class);
conf.setMapOutputValueClass(IntWritable.class);

将两者都包含在您的代码中。

关于java - 在单个作业中将自定义对象从Map传递到Reduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33000486/

相关文章:

hadoop - 如何执行 Group by 然后在 pig 的其他列上使用 DISTINCT

hadoop - 配置单元:使用带分隔符的 collect_set

hadoop - 提高 Java MapReduce 性能的思路

java - JVM、字节码、垃圾收集器和计算机代码解释(使用多次返回 "new Object()"的函数)

java - JPA Hibernate::实体的继承,带有附加的 OneToMany 列表

java - 在 shell 脚本中运行 Hadoop 命令

hadoop - 如何将 Weka 与 Hadoop 连接起来?

mongodb map 减少 value.count

python - 有没有办法在 Hadoop 管理 Web 界面中指定 mrjob 的作业标题?

java - 可针对 JAXB 更改的 xml 文件的 XSD 架构