python - 减少 MapReduce 结果的有效方法?

标签 python optimization hadoop mapreduce hadoop-streaming

我编写了一个 MapReduce 作业,该作业对数据集进行 ngram 计数。结果是一百个 300MB 的文件,格式为 <ngram>\t<count> 。我想将这些组合成一个结果,但我几次组合尝试都失败了(“任务跟踪器已经消失”)。我的超时时间是 8 小时,这次崩溃发生在 8.5 小时左右,所以可能是相关的。我有#reducers=5(与节点数相同)。也许我只是需要留出更多时间,尽管错误似乎并未表明这一点。我怀疑我的节点重载并且变得无响应。我的理论是我的 reducer 可以使用一些优化。

我正在使用cat对于我的映射器,以及以下用于我的 reducer 的 python 脚本:

#!/usr/bin/env python
import sys

counts = {}
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if key not in counts:
        counts[key] = 0
    counts[key] += count

for key in sorted(counts.keys()):
    print '%s\t%s'% (key, counts[key])

更新: 正如我在一篇评论中暗示的那样,我对 Hadoop 自动进行的排序感到困惑。在 Web UI 中,reducer 状态显示了几个不同的阶段,其中包括“sort”和“reduce”。由此,我假设 Hadoop 在将映射器输出发送到减少之前对其进行排序,但不清楚的是是否对发送到减少器的所有数据进行排序,或者在减少之前对每个文件进行排序。换句话说,我的映射器采用 100 个字段,将其分成 400 个输出,每个输出只是 cat -将它们发送给reducer,然后reducer(总共5个)每个都会接收这80个流。 sort 是合并所有 80,还是排序 1,减少它; ETC?根据这些图表,这显然不能表明实际行为,排序过程发生在任何归约之前。如果排序确实对所有输入文件进行了排序,那么我可以简化我的化简器,使其不存储所有计数的字典,并在键更改后打印出 key-totalCount 对。

关于组合器的使用,我认为这对我的情况没有好处,因为我要减少的数据已经在我尝试组合的 100 个文件中减少了。由于我的 #nodes = #reducers (5 & 5),因此没有任何可组合的 reducer 尚未执行的操作。

最佳答案

问题是我对 MapReduce 工作原理的误解。所有进入Reducer 的数据都经过排序。我上面的代码完全没有优化。相反,我只是跟踪当前 key ,然后在新 key 出现时打印出上一个当前 key 。

#!/usr/bin/env python
import sys

cur_key = None
cur_key_count = 0
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    # if new key, reset count, note current key, and output lastk key's result
    if key != cur_key:
        if cur_key is not None:
            print '%s\t%s'% (cur_key, cur_key_count)
        cur_key = key
        cur_key_count = 0
    cur_key_count += count
# printing out final key if set
if cur_key:
    print '%s\t%s'% (cur_key, cur_key_count)

关于python - 减少 MapReduce 结果的有效方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8022146/

相关文章:

python - 如何让线程使用下一个尚未使用的对象?

python - memit 和 timeit 同时

mysql - 如何使用具有多个 GROUP BY、子查询和 WHERE IN 的查询优化查询?

hadoop - 使用 spark/scala,我使用 saveAsTextFile() 到 HDFS,但是 hiveql("select count(*) from...) return 0

java - Cloudera Quickstart VM illegalArguementException : Wrong FS: hdfs: expected: file:

java - 使用Hadoop和Eclipse的Ubuntu上的磁盘泄漏

python - 使用不带DatetimeIndex但已知频率的statsmodels.seasonal_decompose()

python - 将一个值列表而不是多个参数传递给函数?

C++:使用模板变量进行优化

c++ - 在 SPOJ AP3(AP - 完成系列)中得到错误答案?