我正在尝试使用包含以下内容的复合键在 mapreduce 中进行二次排序:
String natural-key = 程序名
用于排序的长键 = 自 1970 年以来以毫秒为单位的时间
问题是在排序后我根据整个复合键得到了很多缩减器
通过调试,我已经验证了哈希码和比较函数是正确的。 从调试日志中,每个 block 都来自不同的 reducer,它表明分组或分区没有成功。 来自调试日志:
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key=the voice
14/12/14 00:55:12 INFO popularitweet.EtanReducer: the voice: Thu Dec 11 17:51:03 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: the voice: Thu Dec 11 17:51:03 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key the voice ended
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key=top gear
14/12/14 00:55:12 INFO popularitweet.EtanReducer: top gear: Thu Dec 11 17:51:04 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key top gear ended
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key=american horror story
14/12/14 00:55:12 INFO popularitweet.EtanReducer: american horror story: Thu Dec 11 17:51:04 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key american horror story ended
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key=the voice
14/12/14 00:55:12 INFO popularitweet.EtanReducer: the voice: Thu Dec 11 17:51:04 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key the voice ended
如您所见,语音被发送到两个不同的 reducer,但时间戳不同。 任何帮助,将不胜感激。 组合键是以下类:
public class ProgramKey implements WritableComparable<ProgramKey> {
private String program;
private Long timestamp;
public ProgramKey() {
}
public ProgramKey(String program, Long timestamp) {
this.program = program;
this.timestamp = timestamp;
}
@Override
public int compareTo(ProgramKey o) {
int result = program.compareTo(o.program);
if (result == 0) {
result = timestamp.compareTo(o.timestamp);
}
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, program);
dataOutput.writeLong(timestamp);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
program = WritableUtils.readString(dataInput);
timestamp = dataInput.readLong();
}
我实现的 Partitioner、GroupingComparator 和 SortingComparator 是这些:
public class ProgramKeyPartitioner extends Partitioner<ProgramKey, TweetObject> {
@Override
public int getPartition(ProgramKey programKey, TweetObject tweetObject, int numPartitions) {
int hash = programKey.getProgram().hashCode();
int partition = hash % numPartitions;
return partition;
}
}
public class ProgramKeyGroupingComparator extends WritableComparator {
protected ProgramKeyGroupingComparator() {
super(ProgramKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ProgramKey k1 = (ProgramKey) a;
ProgramKey k2 = (ProgramKey) b;
return k1.getProgram().compareTo(k2.getProgram());
}
}
public class TimeStampComparator extends WritableComparator {
protected TimeStampComparator() {
super(ProgramKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ProgramKey ts1 = (ProgramKey)a;
ProgramKey ts2 = (ProgramKey)a;
int result = ts1.getProgram().compareTo(ts2.getProgram());
if (result == 0) {
result = ts1.getTimestamp().compareTo(ts2.getTimestamp());
}
return result;
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// Create configuration
Configuration conf = new Configuration();
// Create job
Job job = new Job(conf, "test1");
job.setJarByClass(EtanMapReduce.class);
// Set partitioner keyComparator and groupComparator
job.setPartitionerClass(ProgramKeyPartitioner.class);
job.setGroupingComparatorClass(ProgramKeyGroupingComparator.class);
job.setSortComparatorClass(TimeStampComparator.class);
// Setup MapReduce
job.setMapperClass(EtanMapper.class);
job.setMapOutputKeyClass(ProgramKey.class);
job.setMapOutputValueClass(TweetObject.class);
job.setReducerClass(EtanReducer.class);
// Specify key / value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TweetObject.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute job
logger.info("starting job");
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
最佳答案
编辑...
您的 TimeStampComparator 似乎有错字……您将 ts2 设置为 a 而它应该设置为 b:
ProgramKey ts1 = (ProgramKey)a;
ProgramKey ts2 = (ProgramKey)a;
什么时候应该:
ProgramKey ts1 = (ProgramKey)a;
ProgramKey ts2 = (ProgramKey)b;
这将导致错误排序的键/值对,并使分组比较器做出的键/值对已排序的假设无效。
还要检查原始程序名称是否采用 UTF-8 格式,因为 WritableUtils 就是这样假定的。您系统的默认代码页也是 UTF-8 吗?
关于java - mapreduce 二次排序不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27464542/