python - Python 中经典 Hadoop 字数统计示例中的数据流

标签 python hadoop mapreduce word-count

我正在尝试理解 Python 中的 Hadoop 字数统计示例
http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

作者从朴素版本的 mapper 和 reducer 开始。这是 reducer (为简洁起见,我删除了一些评论)

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    line = line.strip()

    word, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

作者使用以下方法测试程序:
echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py

所以reducer的写法就好像reducer作业的输入数据是这样的:
aa 1
aa 1
bb 1
cc 1
cc 1
cc 1

我最初对 reducer 的理解是,给定 reducer 的输入数据将包含一个唯一键。所以在前面的例子中,需要 3 个 reducers 作业。我的理解不正确吗?

然后作者介绍了mapper和reducer的改进版本。这是 reducer :
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)

    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

作者添加了以下警告:

Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test command “cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.



我不明白为什么天真的测试命令不适用于新版本。我以为使用sort -k1,1将为 reducer 的两个版本产生相同的输入。我错过了什么?

最佳答案

关于您的第一个问题:“我对 reducer 的最初理解是,给定 reducer 的输入数据将包含一个唯一键。因此在前面的示例中,需要 3 个 reducer 作业。我的理解不正确吗?”

MapReduce 抽象与 Hadoop 对该抽象的实现之间存在差异。在抽象中,reducer 与唯一键相关联。另一方面,Hadoop 实现将多个键分配给同一个 reducer(以避免关闭进程和启动新进程的成本)。特别是,在 Hadoop 流中,reducer 接收与一定数量的键(可能是零、一个或多个键)对应的键值对,但您可以保证与某个键关联的键值对将彼此接连而来。

例如,让我们以输入“foo foo quux labs foo bar quux”为例进行字数统计。然后可能是一个reducer 接收输入“bar 1\nfoo 1\nfoo 1\nfoo1”,另一个reducer 接收“labs 1\nquux 1\nquux 1”。实际运行的 reducer 进程的数量由您使用选项 mapred.reduce.tasks 决定。例如要使用 2 个 reducer ,您可以这样做

 $ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=2 -mapper ....

关于你的第二个问题,我同意你的看法 sort -k1,1会成功的,所以我也看不到问题。

关于python - Python 中经典 Hadoop 字数统计示例中的数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18293916/

相关文章:

javascript - Pycurl 脚本

python - 在 Python 中为具有 ANSI 颜色代码的字符串获取正确的字符串长度

python - 导出 Scikit Learn 随机森林以在 Hadoop 平台上使用

apache-spark - Apache Spark与MapReduce

hadoop - MapReduce 中 1 个任务的 reducer 数量

hadoop - 如何在hadoop reducer中写入int值

python - 在给定一组坐标的情况下计算曲线下的面积,而不知道函数

python - Dynaconf 使用 .secrets.toml 覆盖 settings.toml 中的部分

java - Hadoop Mapreduce错误NativeMethodAccessor

PIG 中 Elephant-Bird UDF 中的 JSON 数组字段处理