hadoop - Map-Reduce 没有像预期的那样减少复杂的键和值

标签 hadoop mapreduce hadoop2

无论我如何简单地对复杂键进行比较,我都没有得到预期的结果。除了如果我为每条记录使用一个相同的键,它会适本地减少到一条记录。我还目睹了这种情况只有在我处理满负载时才会发生,如果我中断一些没有减少的记录并在更小的规模上运行它,这些记录就会合并。

输出记录的总和是正确的,但是在我希望将项目组合在一起的记录级别存在重复。因此,我预计 500 条记录总计 5,000 条记录,我最终得到 1232 条记录总计 5,000 条记录,其中明显的记录应该减少为一条。

我已经阅读了有关对象引用和复杂键和值的问题,但我没有看到任何我有潜力的地方。为此,您会发现我正在创建我可能不需要的新对象的地方,但我正在尝试一切,一旦它工作就会将其拨回。

我不知道该尝试什么或在哪里以及如何解决这个问题。请帮忙!

public static class Map extends
        Mapper<LongWritable, Text, IMSTranOut, IMSTranSums> {

    //private SimpleDateFormat dtFormat = new SimpleDateFormat("yyyyddd");

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        SimpleDateFormat dtFormat = new SimpleDateFormat("yyyyddd");

        IMSTranOut dbKey = new IMSTranOut();
        IMSTranSums sumVals = new IMSTranSums();

        String[] tokens = line.split(",", -1);

        dbKey.setLoadKey(-99);
        dbKey.setTranClassKey(-99);

        dbKey.setTransactionCode(tokens[0]);
        dbKey.setTransactionType(tokens[1]);
        dbKey.setNpaNxx(getNPA(dbKey.getTransactionCode()));

        try {
            dbKey.setTranDate(new Date(dtFormat.parse(tokens[2]).getTime()));
        } catch (ParseException e) {

        }// 2

        dbKey.setTranHour(getTranHour(tokens[3]));

        try {
            dbKey.setStartDate(new Date(dtFormat.parse(tokens[4]).getTime()));
        } catch (ParseException e) {
        }// 4

        dbKey.setStartHour(getTranHour(tokens[5]));

        try {
            dbKey.setStopDate(new Date(dtFormat.parse(tokens[6]).getTime()));
        } catch (ParseException e) {
        }// 6

        dbKey.setStopHour(getTranHour(tokens[7]));

        sumVals.setTranCount(1);
        sumVals.setInputQTime(Double.parseDouble(tokens[8]));
        sumVals.setElapsedTime(Double.parseDouble(tokens[9]));
        sumVals.setCpuTime(Double.parseDouble(tokens[10]));

        context.write(dbKey, sumVals);
    }

}

public static class Reduce extends
        Reducer<IMSTranOut, IMSTranSums, IMSTranOut, IMSTranSums> {

    @Override
    public void reduce(IMSTranOut key, Iterable<IMSTranSums> values,
            Context context) throws IOException, InterruptedException {

        int tranCount = 0;
        double inputQ = 0;
        double elapsed = 0;
        double cpu = 0;

        for (IMSTranSums val : values) {
            tranCount += val.getTranCount();
            inputQ += val.getInputQTime();
            elapsed += val.getElapsedTime();
            cpu += val.getCpuTime();
        }

        IMSTranSums sumVals=new IMSTranSums();
        IMSTranOut dbKey=new IMSTranOut();

        sumVals.setCpuTime(inputQ);
        sumVals.setElapsedTime(elapsed);
        sumVals.setInputQTime(cpu);
        sumVals.setTranCount(tranCount);

        dbKey.setLoadKey(key.getLoadKey());
        dbKey.setTranClassKey(key.getTranClassKey());
        dbKey.setNpaNxx(key.getNpaNxx());
        dbKey.setTransactionCode(key.getTransactionCode());
        dbKey.setTransactionType(key.getTransactionType());
        dbKey.setTranDate(key.getTranDate());
        dbKey.setTranHour(key.getTranHour());
        dbKey.setStartDate(key.getStartDate());
        dbKey.setStartHour(key.getStartHour());
        dbKey.setStopDate(key.getStopDate());
        dbKey.setStopHour(key.getStopHour());

        dbKey.setInputQTime(inputQ);
        dbKey.setElapsedTime(elapsed);
        dbKey.setCpuTime(cpu);
        dbKey.setTranCount(tranCount);

        context.write(dbKey, sumVals);
    }
}

下面是 DBWritable 类的实现:
public class IMSTranOut implements DBWritable,
    WritableComparable<IMSTranOut> {

private int loadKey;
private int tranClassKey;
private String npaNxx;
private String transactionCode;
private String transactionType;
private Date tranDate;
private double tranHour;
private Date startDate;
private double startHour;
private Date stopDate;
private double stopHour;
private double inputQTime;
private double elapsedTime;
private double cpuTime;
private int tranCount;

public void readFields(ResultSet rs) throws SQLException {
    setLoadKey(rs.getInt("LOAD_KEY"));
    setTranClassKey(rs.getInt("TRAN_CLASS_KEY"));
    setNpaNxx(rs.getString("NPA_NXX"));
    setTransactionCode(rs.getString("TRANSACTION_CODE"));
    setTransactionType(rs.getString("TRANSACTION_TYPE"));
    setTranDate(rs.getDate("TRAN_DATE"));
    setTranHour(rs.getInt("TRAN_HOUR"));
    setStartDate(rs.getDate("START_DATE"));
    setStartHour(rs.getInt("START_HOUR"));
    setStopDate(rs.getDate("STOP_DATE"));
    setStopHour(rs.getInt("STOP_HOUR"));
    setInputQTime(rs.getInt("INPUT_Q_TIME"));
    setElapsedTime(rs.getInt("ELAPSED_TIME"));
    setCpuTime(rs.getInt("CPU_TIME"));
    setTranCount(rs.getInt("TRAN_COUNT"));
}

public void write(PreparedStatement ps) throws SQLException {
    ps.setInt(1, loadKey);
    ps.setInt(2, tranClassKey);
    ps.setString(3, npaNxx);
    ps.setString(4, transactionCode);
    ps.setString(5, transactionType);
    ps.setDate(6, tranDate);
    ps.setDouble(7, tranHour);
    ps.setDate(8, startDate);
    ps.setDouble(9, startHour);
    ps.setDate(10, stopDate);
    ps.setDouble(11, stopHour);
    ps.setDouble(12, inputQTime);
    ps.setDouble(13, elapsedTime);
    ps.setDouble(14, cpuTime);
    ps.setInt(15, tranCount);
}

public int getLoadKey() {
    return loadKey;
}

public void setLoadKey(int loadKey) {
    this.loadKey = loadKey;
}

public int getTranClassKey() {
    return tranClassKey;
}

public void setTranClassKey(int tranClassKey) {
    this.tranClassKey = tranClassKey;
}

public String getNpaNxx() {
    return npaNxx;
}

public void setNpaNxx(String npaNxx) {
    this.npaNxx = new String(npaNxx);
}

public String getTransactionCode() {
    return transactionCode;
}

public void setTransactionCode(String transactionCode) {
    this.transactionCode = new String(transactionCode);
}

public String getTransactionType() {
    return transactionType;
}

public void setTransactionType(String transactionType) {
    this.transactionType = new String(transactionType);
}

public Date getTranDate() {
    return tranDate;
}

public void setTranDate(Date tranDate) {
    this.tranDate = new Date(tranDate.getTime());
}

public double getTranHour() {
    return tranHour;
}

public void setTranHour(double tranHour) {
    this.tranHour = tranHour;
}

public Date getStartDate() {
    return startDate;
}

public void setStartDate(Date startDate) {
    this.startDate = new Date(startDate.getTime());
}

public double getStartHour() {
    return startHour;
}

public void setStartHour(double startHour) {
    this.startHour = startHour;
}

public Date getStopDate() {
    return stopDate;
}

public void setStopDate(Date stopDate) {
    this.stopDate = new Date(stopDate.getTime());
}

public double getStopHour() {
    return stopHour;
}

public void setStopHour(double stopHour) {
    this.stopHour = stopHour;
}

public double getInputQTime() {
    return inputQTime;
}

public void setInputQTime(double inputQTime) {
    this.inputQTime = inputQTime;
}

public double getElapsedTime() {
    return elapsedTime;
}

public void setElapsedTime(double elapsedTime) {
    this.elapsedTime = elapsedTime;
}

public double getCpuTime() {
    return cpuTime;
}

public void setCpuTime(double cpuTime) {
    this.cpuTime = cpuTime;
}

public int getTranCount() {
    return tranCount;
}

public void setTranCount(int tranCount) {
    this.tranCount = tranCount;
}

public void readFields(DataInput input) throws IOException {
    setNpaNxx(input.readUTF());
    setTransactionCode(input.readUTF());
    setTransactionType(input.readUTF());
    setTranDate(new Date(input.readLong()));
    setStartDate(new Date(input.readLong()));
    setStopDate(new Date(input.readLong()));
    setLoadKey(input.readInt());
    setTranClassKey(input.readInt());
    setTranHour(input.readDouble());
    setStartHour(input.readDouble());
    setStopHour(input.readDouble());
    setInputQTime(input.readDouble());
    setElapsedTime(input.readDouble());
    setCpuTime(input.readDouble());
    setTranCount(input.readInt());
}

public void write(DataOutput output) throws IOException {
    output.writeUTF(npaNxx);
    output.writeUTF(transactionCode);
    output.writeUTF(transactionType);
    output.writeLong(tranDate.getTime());
    output.writeLong(startDate.getTime());
    output.writeLong(stopDate.getTime());
    output.writeInt(loadKey);
    output.writeInt(tranClassKey);
    output.writeDouble(tranHour);
    output.writeDouble(startHour);
    output.writeDouble(stopHour);
    output.writeDouble(inputQTime);
    output.writeDouble(elapsedTime);
    output.writeDouble(cpuTime);
    output.writeInt(tranCount);
}

public int compareTo(IMSTranOut o) {

    return (Integer.compare(loadKey, o.getLoadKey()) == 0
            && Integer.compare(tranClassKey, o.getTranClassKey()) == 0
            && npaNxx.compareTo(o.getNpaNxx()) == 0
            && transactionCode.compareTo(o.getTransactionCode()) == 0
            && (transactionType.compareTo(o.getTransactionType()) == 0)
            && tranDate.compareTo(o.getTranDate()) == 0
            && Double.compare(tranHour, o.getTranHour()) == 0
            && startDate.compareTo(o.getStartDate()) == 0
            && Double.compare(startHour, o.getStartHour()) == 0
            && stopDate.compareTo(o.getStopDate()) == 0 
            && Double.compare(stopHour, o.getStopHour()) == 0) ? 0 : 1;

}
}

复杂值的 Writable 类的实现:
public class IMSTranSums
 implements Writable{


        private double inputQTime;
        private double elapsedTime;
        private double cpuTime;
        private int tranCount;

        public double getInputQTime() {
            return inputQTime;
        }
        public void setInputQTime(double inputQTime) {
            this.inputQTime = inputQTime;
        }
        public double getElapsedTime() {
            return elapsedTime;
        }
        public void setElapsedTime(double elapsedTime) {
            this.elapsedTime = elapsedTime;
        }
        public double getCpuTime() {
            return cpuTime;
        }
        public void setCpuTime(double cpuTime) {
            this.cpuTime = cpuTime;
        }
        public int getTranCount() {
            return tranCount;
        }
        public void setTranCount(int tranCount) {
            this.tranCount = tranCount;
        }

        public void write(DataOutput output) throws IOException {
            output.writeDouble(inputQTime);
            output.writeDouble(elapsedTime);
            output.writeDouble(cpuTime);
            output.writeInt(tranCount);
        }
        public void readFields(DataInput input) throws IOException {
            inputQTime=input.readDouble();
            elapsedTime=input.readDouble();
            cpuTime=input.readDouble();
            tranCount=input.readInt();

        }
}

最佳答案

您的 compareTo有缺陷,它将完全使排序算法失败,因为您似乎在排序中破坏了传递性。

我建议您使用 CompareToBuilder来自 Apache Commons 或 ComparisonChain来自 Guava ,使您的比较更具可读性(并且正确!)。

关于hadoop - Map-Reduce 没有像预期的那样减少复杂的键和值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26003807/

相关文章:

hadoop - 如何在 Hadoop 或 Spark 中实现有向无环图?

hadoop - 当 Name Node 在 YARN 中失败时,工作状态是什么?

java - 包括核心和数学文件夹的 Mahout 数学库类 - Eclipse - Hadoop

linux - 虽然用户 HDFS 是目录的所有者,但我无法查看所有目录

hadoop - mapreduce 中间键排序的网络带宽瓶颈?

Hadoop API : OutputFormat for Reducer

hadoop - Name Node 同时作为 Secondary Name Node 是否理想?

hadoop - 带排序关键字的 Elasticsearch 查询参数

hadoop - 提供参数以执行Hive批处理的好方法

java - 一个 MapReduce 程序的输出作为另一个 MapReduce 程序的输入