sorting - 我可以为reducer中的记录分配序列号吗

标签 sorting hadoop mapreduce

我希望根据事件发生的时间为来自 Mapper 类的事件分配一个序列号。

例如,我有 100 个事件,其中有时间。我希望根据时间对它们进行排序,然后在 reducer 阶段为它们分配一个序列号。此外,如果它们是重复的,则在 reducer 阶段删除重复的记录(同时发生相同的事件)。

映射器方法:

public class EventMapper extends Mapper<LongWritable, Text, Text, Event> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    Text newKey;
    Event e = new Event();
    e.setAllValues(line);
    newKey = new Text(e.getKey());
    context.write(newKey, e);
}
}

reducer 方法(我想要的东西):
public class EventReducer extends Reducer<Text, Event, Text, Text> {

public void reduce(Text key, Iterator<Event> itrtr, Context context) throws IOException, InterruptedException {
    Event e;
    List<Event> l = new ArrayList<Event>();
    while(itrtr.hasNext()){
        e = itrtr.next();
         l.add(e);
    }
    Collections.sort(l);
    long i = 1;
    for (Event event : l) {
        event.setId(++i);
        context.write(key, new Text(event.toString()));
    }
}
}

我把所有的 id 都设为 0。我怎样才能做到这一点?我是否采用了错误的方法。

这是事件类:
public class Event implements Writable, WritableComparable<Event> {
//Some variables and getter + setters
 @Override
public String toString() {
    String delimiter1 = "|";
    return this.date + delimiter1
            + this.evName + delimiter1
            + this.evType + delimiter1
            + this.evValue + delimiter1
            + this.name + delimiter1
            + this.id;
}

@Override
public void readFields(DataInput in) throws IOException {
    try {
        this.date = converStringToDate((WritableUtils.readString(in)).toString(), dateFormat);
    } catch (ParseException ex) {
        System.out.println("Wront date . Pe");
    }
    this.evName = WritableUtils.readString(in);
    this.evType = WritableUtils.readString(in);
    this.evValue = WritableUtils.readString(in);
    this.name = WritableUtils.readString(in);
    this.id = WritableUtils.readVLong(in);
}

@Override
public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    WritableUtils.writeString(out, this.convertDateToString(date));
    WritableUtils.writeString(out, evName);
    WritableUtils.writeString(out, evType);
    WritableUtils.writeString(out, evValue);
    WritableUtils.writeString(out, name);
    WritableUtils.writeVLong(out, id);
}

public int compareTo(Event o) {
    long value = this.getDate().getTime() - o.getDate().getTime();
    if (value == 0) {
        return 0;
    } else if (value > 1) {
        return -1;
    } else {
        return 1;
    }
    }
public void setAllValues(String input) {
    String[] arrValues = input.split(delimiter);
    System.out.println("No of Values = " + arrValues.length);
    try {
        this.date = converStringToDate(arrValues[0], dateFormat);
    } catch (ParseException pe) {
        System.out.println("pe> Error in date");
    }
    if (arrValues.length >= 2) {
        this.evName = arrValues[1];
    }
    if (arrValues.length >= 3) {
        this.evType = arrValues[2];
    }
    if (arrValues.length >= 4) {
        this.evValue = arrValues[3];
    }
    if (arrValues.length >= 5) {
        this.name = arrValues[4];
    }
}

public String getKey() {
    //return convertDateToString(this.date) + this.evName + this.evType;
    return this.evName;
}
}

最佳答案

几个建议:

  • 更改 getKey() 以返回 date.getTime()。这是一个长值,并且比字符串更快。将您的内部键类型更改为 LongWritable。
  • 您正在利用 hadoop 行为,即在传递给 reducer 之前按键值对记录进行排序。这是一种排序方式,但您必须确保在作业配置中将 numberOfReducers 设置为 1。否则,您将有多个 reducer 从 1 开始在它们自己的分区上分配等级。
  • 您可以使用多个 reducer,但您必须在这项工作之后完成一项工作,以合并所有内部排名的数据分区。
  • 请记住,您的 reducer 将为每个键值调用一次,即使该键有多个记录(例如同时多个事件)。如果您想忽略这些重复事件,那么无论 Iterable 有多少条记录,reducer 都应该只将一条记录写入上下文。值有。
  • 为了正确分配等级(id),你需要在你的reducer中有一个long类型的实例变量(称之为counter)。您需要在 setup() 中对其进行初始化方法并在 reduce() 中增加它方法。
  • 关于sorting - 我可以为reducer中的记录分配序列号吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13585434/

    相关文章:

    hadoop - 如何在Apache Crunch中进行Map端全外部联接(MapsideJoinStrategy不支持的联接类型FULL_OUTER_JOIN)

    Hadoop MapReduce : Two values as key in Mapper-Reducer

    javascript - 仅第一个值发生更改时如何使用 Javascript 数组

    arrays - Googlefinance - 返回高价和日期

    java - 使用数字字段对二维数组字符串进行排序

    hadoop - Flume每个文件批处理十行

    hadoop - 使用 YARN/Hadoop 调度,我可以只抢占某些队列吗?

    sorting - Haskell 列表输出中缺少第一个元素

    serialization - Hadoop中自定义Writable的实现?

    sql - 在多个列中的每个列上选择不重复