java - 在 hadoop : Type Mismatch 中链接作业

标签 java hadoop mapreduce

我想映射 -> 缩减 -> 映射 -> 缩减

这是我想做的:

我有这个输入 tsv 文件:

1   2
2   1
2   3
3   2
4   2
4   3

在我的第一个 map/reduce 工作之后,我有这个

1   0
2  -1
3  -1
4   2

在我的第 2 个 map/reduce 作业之后,我得到了这个(输出文件)

2   1
-1  2
0   1

除了我的代码编译但是对于第二个作业,我有这个错误

Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text

我不明白,因为我没有将值文本发送给我的第二份工作
这是我的完整代码:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class problem {

 public static class DiffMapper extends Mapper<Object, Text, Text, IntWritable> {

    Text key = new Text();
    private final static IntWritable one = new IntWritable(1);
    private final static IntWritable minus = new IntWritable(-1);

    public void map(Object offset, Text value, Context context)  throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString(), "\t");

        while (itr.hasMoreTokens()) {
            if(itr.countTokens() % 2 == 0) {
                key.set(itr.nextElement().toString());
                context.write(key, one);
            }

            else {
                key.set(itr.nextElement().toString());
                context.write(key, minus);
            }
        }
    }
 }

  public static class DiffReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)  throws IOException, InterruptedException {

      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);

    }

  }

 public static class CountMapper extends Mapper<Text, IntWritable, IntWritable, IntWritable> {

    IntWritable key2 = new IntWritable();
    private final static IntWritable one = new IntWritable(1);

    public void mapCount(Text offset, Text value, Context context)  throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString(), "\t");

        while (itr.hasMoreElements()) {

            String node = itr.nextElement().toString();
            Integer diff = Integer.parseInt(itr.nextElement().toString());

            key2.set(diff);

            context.write(key2, one);

        }
    }
 }



  public static class CountReducer extends Reducer<IntWritable,IntWritable,LongWritable,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduceCount(LongWritable key, Iterable<IntWritable> values, Context context)  throws IOException, InterruptedException {

      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);

    }

  }



  public static void main(String[] args) throws Exception {
    Configuration conf1 = new Configuration();
    Job job = Job.getInstance(conf1, "problem");
    job.setJarByClass(problem.class);
    job.setMapperClass(DiffMapper.class);
    job.setReducerClass(DiffReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    Path outputPath = new Path("Diff");

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, outputPath);
    outputPath.getFileSystem(conf1).delete(outputPath);
    job.waitForCompletion(true);
    //System.exit(job.waitForCompletion(true) ? 0 : 1);



    Configuration conf2 = new Configuration();
    Job job2 = Job.getInstance(conf2, "problem");
    job2.setJarByClass(problem.class);
    job2.setMapperClass(CountMapper.class);
    job2.setReducerClass(CountReducer.class);
    job2.setOutputKeyClass(LongWritable.class);
    job2.setOutputValueClass(IntWritable.class);
    Path outputPath2 = new Path(args[1]);
    FileInputFormat.addInputPath(job2, outputPath);
    FileOutputFormat.setOutputPath(job2, new Path(args[1]));
    outputPath2.getFileSystem(conf2).delete(outputPath2, true);

    System.exit(job2.waitForCompletion(true) ? 0 : 1);
  }
}

最佳答案

默认情况下,所有映射器都将使用 TextInputFormat .因此,键是 LongWritable,值是 Text。

您的错误是因为您将 IntWritable 设置为值。

你的第二个映射器与第一个没有什么不同,所以两个映射器的定义都需要是 extends Mapper<LongWritable, Text

此外,方法名称 mapCountreduceCount对 mapreduce 没有任何意义。方法名称必须是 mapreduce因此,你应该添加一个 @Override注释让编译器知道该方法覆盖了 Mapper 类。随着这一点,参数Text offset, Text value需要是 LongWritable offset, Text value .还要确保 Reducer 具有正确的方法参数类型。

您已经使用 Integer diff 将这些行解析回方法体内的整数

值得指出的是 - 您的 reducer 完全相同。因此,对于两个 mapreduce 阶段,您只需要一个类

关于java - 在 hadoop : Type Mismatch 中链接作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47117847/

相关文章:

java - 将字符串拆分为已定义的数组

java - 模板中的 angular2 ngfor 对象数组错误

java - 如何从 Hadoop 序列文件中获取最后修改日期?

python - 文件 “test_hdfs.py”,save_path = saver.save(sess,hdfs_path +“save_net.ckpt”) “Parent directory of {} doesn'存在,无法保存。”。format(save_path))

hadoop - 如何找到自动终止的 hive mapreduce 作业的原因

java - "Heap Size"对 Hadoop Namenode 意味着什么?

hadoop - 风筝数据集 map-reduce

java - 如何在程序运行时更新GUI?

hadoop - 使用Pig来加载Word文档(.doc和.docx)

java - TOmcat7 在 Eclipse Kepler 中不工作