hadoop - 按两个值分组的 Reducer

标签 hadoop mapreduce

我有一个案例,其中 Mapper 发出属于一个子组的数据,并且该子组属于一个组。

我需要将子组中的所有值相加,并为每个组找到该组的所有子组之间的最小值。

所以,我有一个 Mapper 的输出,看起来像这样

Group 1

group,subgroupId,value
Group1,1,2
Group1,1,3
Group1,1,4
Group1,2,1
Group1,2,2
Group1,3,1
Group1,3,2
Group1,3,5

Group 2

group,subgroupId,value
Group2,4,2
Group2,4,3
Group2,4,4
Group2,5,1
Group2,5,2
Group2,6,1
Group2,6,2

我的输出应该是

Group1, 1, (2+3+4)
Group1, 2, (1+2)
Group1, 3, (1+2+5)

Group1 min = min((2+3+4),(1+2),(1+2+5))

第 2 组相同。

所以我实际上需要分组两次,首先按 GROUP 分组,然后在其中按 SUBGROUPID 分组。

所以我应该从一个组中发出最小总和,在给定的例子中我的化简器应该发出 (2,3),因为最小总和是 3,它来自 id 为 2 的元素。

因此,似乎最好使用两次 reduce 来解决这个问题,第一次 reduce 会获取按 id 分组的元素,然后将其传递给按组 id 分组的第二个 Reducer。

这是否有意义以及如何实现?我见过 ChainedMapper 和 ChainedReducer,但它们不适合这个目的。

谢谢

最佳答案

如果所有数据都可以放入一台机器的内存中,您只需使用一个 reducer (job.setNumReducers(1);) 和两个临时变量,就可以在一个作业中完成所有这些工作.输出在 reducer 的清理阶段发出。这是伪代码,如果您使用新的 Hadoop API(支持 cleanup() 方法):

int tempKey;
int tempMin;    

setup() {
    tempMin = Integer.MAX_VALUE;
}

reduce(key, values) {
    int sum = 0;
    while (values.hasNext()) {
        sum += values.next();
    }
    if (sum < tempMin) {
        tempMin = sum;
        tempKey = key;
    }
}

cleanup() { //only in the new API
    emit(tempKey, tempMin);
}

关于hadoop - 按两个值分组的 Reducer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28238001/

相关文章:

hadoop - 更改 Hadoop 中现有文件的 block 大小

java - Hadoop:java.lang.Exception:java.lang.NoClassDefFoundError:org/apache/xerces/parsers/AbstractSAXParser

hadoop - 如何使用配置单元读取自定义的 hdfs 文件

ruby-on-rails - 复杂算法应该在哪里进行计算

java - Hive Java API 注册 jar

java - 将多前缀行过滤器设置为扫描仪 hbase java

java - 具有多个输出的 FileAlreadyExistsException

java - 从 HIVE UDF 读取 HDFS 文件 - 执行错误,返回代码 101 FunctionTask。无法初始化类

hadoop - 使用 MapReduce 解析 Freebase RDF 转储

java - Hadoop 查找任务尝试的主机名