java - 如何设置一个 reducer 来发送 <Text, IntWritable> 和一个 mapper 来接收 <Text, IntWritable>?

标签 java hadoop mapreduce

我正在使用 ma​​preducehadoop 上开发一些代码,它使用了两个映射器和两个缩减器。 我被告知要使用 SequenceFileInputFormatSequenceFileOutputFormat 使第一个 reducer 的输出和第二个 mapper 的输入一起工作。 问题是我正在记录一个错误,在 googleing 很多之后我不知道为什么。

错误:

java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text

Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text

代码:

package casoTaxis;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class Eje1{

    public static class MapperJob1 extends Mapper<Object, Text, Text, IntWritable> {
        //El metodo map recibe un conjunto clave-valor, lo procesa y lo vuelca en un contexto.adasdadada 
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Text hackLicense; IntWritable totalAmount; //salidas
            StringTokenizer itr = new StringTokenizer(value.toString(), ",");
            itr.nextToken();
            hackLicense = new Text(itr.nextToken());
            for(int i=2; i<itr.countTokens(); i++) itr.nextToken();
            totalAmount = new IntWritable( Integer.parseInt(itr.nextToken()) );
            context.write(hackLicense, totalAmount);
        }
    }

    public static class ReducerJob1 extends Reducer<Text, IntWritable, Text, IntWritable> { //No encontre una clase InpuFormat que sea Text, IntWritable
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static class MapperJob2 extends Mapper<Text, IntWritable, Text, IntWritable> {
        //El metodo map recibe un conjunto clave-valor, lo procesa y lo vuelca en un contexto.adasdadada 
        public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static class ReducerJob2 extends Reducer<Text, IntWritable, Text, Text> {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int max = 0;
            for (IntWritable val : values) {
                int maxVal = val.get();
                if( maxVal>max ) max = maxVal;
            }
            String licencia = "Conductor con licencia = " + key;
            String recaudacion = "Recaudacion = " + max;
            context.write(new Text(licencia), new Text(recaudacion));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf1 = new Configuration();
        Configuration conf2 = new Configuration();
        //conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
        Job job1 = Job.getInstance(conf1, "Eje1-Job1");
        Job job2 = Job.getInstance(conf2, "Eje1-Job2");
        job1.setJarByClass(Eje1.class);
        job2.setJarByClass(Eje1.class);
        job1.setMapperClass(MapperJob1.class);
        job2.setMapperClass(MapperJob2.class);
        job1.setReducerClass(ReducerJob1.class);
        job2.setReducerClass(ReducerJob2.class);

        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(IntWritable.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputKeyClass(IntWritable.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        job1.setOutputFormatClass(SequenceFileOutputFormat.class);
        job2.setInputFormatClass(SequenceFileInputFormat.class);///asdasdads

        FileInputFormat.addInputPath(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, pathIntermedio);
        FileInputFormat.addInputPath(job2, pathIntermedio);
        FileOutputFormat.setOutputPath(job2, new Path(args[1]));

        job1.waitForCompletion(true);
        System.exit(job2.waitForCompletion(true) ? 0 : 1);
    }

    private static final Path pathIntermedio = new Path("intermediate_output");

}

为什么我会收到此错误消息?有没有更好的方法来实现这一点?

最佳答案

错误在于行

job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputKeyClass(IntWritable.class);

第二个应该是:

job2.setMapOutputValueClass(IntWritable.class);

关于java - 如何设置一个 reducer 来发送 <Text, IntWritable> 和一个 mapper 来接收 <Text, IntWritable>?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40221115/

相关文章:

java - 如何保持Java 32位和64位版本相同?

java - 如何创建一个只能添加单一类型且不允许添加子类或父类(super class)的 Set?

java - 我们如何强制许多映射器读取 hadoop 中的一个特定文件(相同数据)?

java - Hadoop 文件系统 : getUsed

Python Streaming : how to reduce to multiple outputs?(尽管使用 Java 是可能的)

hadoop - 如何在mapreduce中求平均值

java - 如何将终端输出(和输入)复制到java中的文件

java - 返回java中的匿名对象

java - 如何使用 Hadoop/Hbase 实现网络搜索?

java - MapReduceBase 和 Mapper 已弃用