java - Hadoop:使用不同的Mappers处理不同的文件,并使用Custom Writable在Reducer中合并结果

标签 java hadoop mapreduce

我正在学习Hadoop。
我有2个Mappers都处理不同的文件,还有1个Reducer结合了这两个Mappers的输入。

输入:
文件1:

1,Abc
2,Mno
3,Xyz

文件2:
1,CS
2,EE
3,CS

预期产量:
1   1,Abc,CS
2   2,Mno,EE
3   3,Xyz,CS

获取输出:
1   1,,CS
2   2,Mno,
3   3,Xyz,

我的代码:

对应器1:
public class NameMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, LongWritable, UserWritable> {

    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<LongWritable, UserWritable> output, Reporter reporter)
            throws IOException {

        String val[] = value.toString().split(",");

        LongWritable id = new LongWritable(Long.parseLong(val[0]));
        Text name = new Text(val[1]);

        output.collect(id, new UserWritable(id, name, new Text("")));
    }
}

映射器2:
public class DepartmentMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, LongWritable, UserWritable> {

    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<LongWritable, UserWritable> output, Reporter reporter)
            throws IOException {

        String val[] = value.toString().split(",");

        LongWritable id = new LongWritable(Integer.parseInt(val[0]));
        Text department = new Text(val[1]);

        output.collect(id, new UserWritable(id, new Text(""), department));
    }
}

reducer :
public class JoinReducer extends MapReduceBase implements
        Reducer<LongWritable, UserWritable, LongWritable, UserWritable> {

    @Override
    public void reduce(LongWritable key, Iterator<UserWritable> values,
            OutputCollector<LongWritable, UserWritable> output,
            Reporter reporter) throws IOException {

        UserWritable user = new UserWritable();

        while (values.hasNext()) {

            UserWritable u = values.next();

            user.setId(u.getId());

            if (!(u.getName().equals(""))) {
                user.setName(u.getName());
            }

            if (!(u.getDepartment().equals(""))) {
                user.setDepartment(u.getDepartment());
            }
        }
        output.collect(user.getId(), user);
    }
}

司机:
public class Driver extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        JobConf conf = new JobConf(getConf(), Driver.class);
        conf.setJobName("File Join");

        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(UserWritable.class);

        conf.setReducerClass(JoinReducer.class);

        MultipleInputs.addInputPath(conf, new Path("/user/hadoop/join/f1"),
                TextInputFormat.class, NameMapper.class);

        MultipleInputs.addInputPath(conf, new Path("/user/hadoop/join/f2"),
                TextInputFormat.class, DepartmentMapper.class);

        Path output = new Path("/user/hadoop/join/output");
        FileSystem.get(new URI(output.toString()), conf).delete(output);

        FileOutputFormat.setOutputPath(conf, output);

        JobClient.runJob(conf);

        return 0;
    }

     public static void main(String[] args) throws Exception {
         int result = ToolRunner.run(new Configuration(), new Driver(), args);
         System.exit(result);
     }
}

UserWritable:
public class UserWritable implements Writable {

    private LongWritable id;
    private Text name;
    private Text department;

    public UserWritable() {
    }

    public UserWritable(LongWritable id, Text name, Text department) {
        super();
        this.id = id;
        this.name = name;
        this.department = department;
    }

    public LongWritable getId() {
        return id;
    }

    public void setId(LongWritable id) {
        this.id = id;
    }

    public Text getName() {
        return name;
    }

    public void setName(Text name) {
        this.name = name;
    }

    public Text getDepartment() {
        return department;
    }

    public void setDepartment(Text department) {
        this.department = department;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        id = new LongWritable(in.readLong());
        name = new Text(in.readUTF());
        department = new Text(in.readUTF());
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(id.get());
        out.writeUTF(name.toString());
        out.writeUTF(department.toString());
    }

    @Override
    public String toString() {
        return id.get() + "," + name.toString() + "," + department.toString();
    }
}

Reducer应该为每个UserId获得2个UserWritable对象;第一个具有ID,名称,第二个具有ID,部门。
谁能解释我在哪里弄错了?

最佳答案

我在代码中发现了问题。

u.getName() 

返回Text对象。
u.getName().toString()解决了问题。

关于java - Hadoop:使用不同的Mappers处理不同的文件,并使用Custom Writable在Reducer中合并结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28448173/

相关文章:

hadoop - map/reduce 中的静态对象

java - 如何发现网络上 Tomcat 服务器的 IP 地址?

java - 按时间间隔对 Java 8 日期进行分组

hadoop - 在多节点集群上运行mapreduce wordcount(Java代码)

hadoop - mapreduce split和spark partition的区别

java - 如何在hadoop WordCount应用程序中获取所有标记词的文件名?

azure - 将Windows Azure用于MapReduce的优势

java - Sqlite 查询返回包含 0 行的游标

java - Maven - 构建后如何使用 "*.properties"文件?

hadoop - Hadoop中的命名空间和元数据之间的区别?