我正在处理类似于规范 MapReduce 示例的内容 - 字数统计,但有一点不同,我希望只获得 Top N 结果。
假设我在 HDFS 中有一组非常大的文本数据。有大量示例展示了如何构建 Hadoop MapReduce 作业,该作业将为您提供该文本中每个单词的字数统计。例如,如果我的语料库是:
"This is a test of test data and a good one to test this"
标准 MapReduce 字数统计作业的结果集将是:
test:3, a:2, this:2, is: 1, etc..
但是,如果我仅想要获得整个数据集中使用的前 3 个词怎么办?
我仍然可以运行完全相同的标准 MapReduce 字数统计作业,然后在它准备就绪后只取前 3 个结果,并吐出每个字的计数,但这似乎有点低效,因为很多在洗牌阶段需要移动数据。
我在想的是,如果这个样本足够大,并且数据在 HDFS 中随机分布良好,那么每个 Mapper 都不需要将其所有字数发送给 Reducers,而是,只有一些顶级数据。所以如果一个映射器有这个:
a:8234, the: 5422, man: 4352, ...... many more words ... , rareword: 1, weirdword: 1, etc.
然后我想做的就是只将每个 Mapper 中的前 100 个左右的词发送到 Reducer 阶段——因为当一切都说完时,“稀有词”突然进入前 3 名的可能性很小并做了。这似乎可以节省带宽和 Reducer 处理时间。
这可以在 Combiner 阶段完成吗?这种在 shuffle 阶段之前的优化通常会完成吗?
最佳答案
这是一个非常好的问题,因为你已经击中了Hadoop的word count example的低效率。
优化问题的技巧如下:
在你的本地 map 阶段做一个基于 HashMap
的分组,你也可以为此使用一个组合器。这看起来像这样,我正在使用 Guava 的 HashMultiSet
,它提供了一个很好的计数机制。
public static class WordFrequencyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final HashMultiset<String> wordCountSet = HashMultiset.create();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
wordCountSet.add(token);
}
}
然后您在清理阶段发出结果:
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Text key = new Text();
LongWritable value = new LongWritable();
for (Entry<String> entry : wordCountSet.entrySet()) {
key.set(entry.getElement());
value.set(entry.getCount());
context.write(key, value);
}
}
因此,您已将单词分组到一个本地工作 block 中,从而通过使用一些 RAM 来减少网络使用。您也可以使用 Combiner
执行相同的操作,但它是按组排序 - 因此这会比使用 HashMultiset
慢(尤其是对于字符串!)。
要获得 Top N,您只需将本地 HashMultiset
中的 Top N 写入输出收集器,并在 reduce 端以正常方式聚合结果。
这也为您节省了大量网络带宽,唯一的缺点是您需要在清理方法中对字数元组进行排序。
部分代码可能如下所示:
Set<String> elementSet = wordCountSet.elementSet();
String[] array = elementSet.toArray(new String[elementSet.size()]);
Arrays.sort(array, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// sort descending
return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
}
});
Text key = new Text();
LongWritable value = new LongWritable();
// just emit the first n records
for(int i = 0; i < N, i++){
key.set(array[i]);
value.set(wordCountSet.count(array[i]));
context.write(key, value);
}
希望您了解在本地执行尽可能多的单词的要点,然后只聚合前 N 个中的前 N 个 ;)
关于algorithm - Hadoop/MapReduce - 优化 "Top N"Word Count MapReduce 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13609366/