java - 在Reducer.reduce 方法中写入context 时,为什么调用的是toString 方法而不是write 方法?

标签 java mapreduce hadoop2

我正在编写一个 map-reduce 批处理作业,它由 3-4 个链式作业组成。在第二个作业中,当通过 context.write() 写入上下文时,我使用自定义类作为输出值类。 在研究代码的行为时,我注意到调用了这个自定义类的 toString 方法,而不是调用了 write 方法。如果类实现了 Writable 接口(interface),而我实现了 write 方法,为什么会发生这种情况?

自定义类的代码:

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class WritableLongPair implements Writable {

private long l1;
private long l2;

public WritableLongPair() {
    l1 = 0;
    l2 = 0;
}

public WritableLongPair(long l1, long l2) {
    this.l1 = l1;
    this.l2 = l2;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeLong(l1);
    dataOutput.writeLong(l2);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
    l1 = dataInput.readLong();
    l2 = dataInput.readLong();
}

@Override
public String toString() {
    return l1 + " " + l2;
}
}

第二个工作的代码:

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Phase2 {

private static final int ASCII_OFFSET = 97;

public static class Mapper2
        extends Mapper<Object, Text, Text, LongWritable>{

    @Override
    public void map(Object key, Text value, Context context
    ) throws IOException, InterruptedException {
        String[] valueAsStrings = value.toString().split("\t");
        String actualKey = valueAsStrings[0];
        LongWritable actualValue = new LongWritable(Long.parseLong(valueAsStrings[1]));
        String[] components = actualKey.toString().split("[$]");
        if (!components[1].equals("*")) {
            context.write(new Text(components[1] + "$" + components[0]), actualValue);
            context.write(new Text(components[1] + "$*"), actualValue);
        }
        context.write(new Text(actualKey), actualValue);
    }
}

public static class Partitioner2 extends Partitioner<Text, LongWritable> {

    @Override
    public int getPartition(Text text, LongWritable longWritable, int i) {
        return (int)(text.toString().charAt(0)) - ASCII_OFFSET;
    }
}

public static class Reducer2
        extends Reducer<Text, LongWritable, Text, WritableLongPair> {

        private Text currentKey;
        private long sum;

    @Override
    public void setup(Context context) {
        currentKey = new Text();
        currentKey.set("");
        sum = 0l;
    }

    private String textContent(String w1, String w2) {
        if (w2.equals("*"))
            return w1 + "$*";
        if (w1.compareTo(w2) < 0)
            return w1 + "$" + w2;
        else
            return w2 + "$" + w1;
    }

    public void reduce(Text key, Iterable<LongWritable> counts,
                       Context context
    ) throws IOException, InterruptedException {
        long sumPair = 0l;
        String[] components = key.toString().split("[$]");
        for (LongWritable count : counts) {
            if (currentKey.equals(components[0])) {
                if (components[1].equals("*"))
                    sum += count.get();
                else
                    sumPair += count.get();
            }
            else {
                sum = count.get();
                currentKey.set(components[0]);
            }
        }
        if (!components[1].equals("*"))
            context.write(new Text(textContent(components[0], components[1])), new WritableLongPair(sumPair, sum));
    }
}

public static class Comparator2 extends WritableComparator {

    @Override
    public int compare(WritableComparable o1, WritableComparable o2) {
        String[] components1 = o1.toString().split("[$]");
        String[] components2 = o2.toString().split("[$]");
        if (components1[1].equals("*") && components2[1].equals("*"))
            return components1[0].compareTo(components2[0]);
        if (components1[1].equals("*")) {
            if (components1[0].equals(components2[0]))
                return -1;
            else
                return components1[0].compareTo(components2[0]);
        }
        if (components2[1].equals("*")) {
            if (components1[0].equals(components2[0]))
                return 1;
            else
                return components1[0].compareTo(components2[0]);
        }
        return components1[0].compareTo(components2[0]);
    }

}

}

...以及我如何定义我的工作:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Manager {

public static void main(String[] args) throws Exception {
    Configuration conf1 = new Configuration();
    if (args.length != 2) {
        System.err.println("Usage: Manager <in> <out>");
        System.exit(1);
    }
    Job job1 = Job.getInstance(conf1, "Phase 1");
    job1.setJarByClass(Phase1.class);
    job1.setMapperClass(Phase1.Mapper1.class);
    job1.setPartitionerClass(Phase1.Partitioner1.class);
//        job1.setCombinerClass(Phase1.Combiner1.class);
    job1.setReducerClass(Phase1.Reducer1.class);
    job1.setInputFormatClass(SequenceFileInputFormat.class);
//        job1.setOutputFormatClass(FileOutputFormat.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(LongWritable.class);
    job1.setNumReduceTasks(12);
    FileInputFormat.addInputPath(job1, new Path(args[0]));
    Path output1 = new Path(args[1]);
    FileOutputFormat.setOutputPath(job1, output1);
    boolean result = job1.waitForCompletion(true);
    Counter counter = job1.getCounters().findCounter("org.apache.hadoop.mapreduce.TaskCounter", "REDUCE_INPUT_RECORDS");
    System.out.println("Num of pairs sent to reducers in phase 1: " + counter.getValue());

    Configuration conf2 = new Configuration();
    Job job2 = Job.getInstance(conf2, "Phase 2");
    job2.setJarByClass(Phase2.class);
    job2.setMapperClass(Phase2.Mapper2.class);
    job2.setPartitionerClass(Phase2.Partitioner2.class);
//        job2.setCombinerClass(Phase2.Combiner2.class);
    job2.setReducerClass(Phase2.Reducer2.class);
    job2.setMapOutputKeyClass(Text.class);
    job2.setMapOutputValueClass(LongWritable.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(WritableLongPair.class);
    job2.setNumReduceTasks(26);
//        job2.setGroupingComparatorClass(Phase2.Comparator2.class);
    FileInputFormat.addInputPath(job2, output1);
    Path output2 = new Path(args[1] + "2");
    FileOutputFormat.setOutputPath(job2, output2);
    result = job2.waitForCompletion(true);
    counter = job2.getCounters().findCounter("org.apache.hadoop.mapreduce.TaskCounter", "REDUCE_INPUT_RECORDS");
    System.out.println("Num of pairs sent to reducers in phase 2: " + counter.getValue());


//        System.exit(job1.waitForCompletion(true) ? 0 : 1);

}
}

最佳答案

如果您使用默认的输出格式化程序 (TextOutputFormat),Hadoop 将在将对象写入磁盘时调用对象的 toString() 方法。这是预期的行为。正在调用 context.write(),但它的输出格式控制着数据在磁盘上的显示方式。

如果您将作业链接在一起,您通常会对所有作业使用 SequenceFileInputFormatSequenceFileOutputFormat,因为它会将一个作业的输出读取到后续作业中很简单。

关于java - 在Reducer.reduce 方法中写入context 时,为什么调用的是toString 方法而不是write 方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37749490/

相关文章:

java - Android Java从十六进制值创建位图

java - Spring Boot 配置文件 - 运行 sql 脚本而不是 ddl-auto

Hadoop 数据和控制流

java - 使用Distributed Cache分发小查找文件的最佳方法

hadoop - 在 hadoop 中格式化 namenode 时线程 "main"出现异常

hadoop - 使用 Pig 在 HBase 中为列族添加可变数量的列

java - 使用 'docker stop' 和官方 java 镜像的 java 进程未收到 SIGTERM

java - HandlerInterceptorAdapter 和 Zuul 过滤器

java - Hadoop MapReduce - 如何创建动态分区

hadoop - 树莓派 Hadoop 集群配置