python - 如何通过识别python Hadoop中的键来处理Mapreduce

标签 python hadoop mapreduce reducers

我有两个来自 map 功能的关键值:NY 和 Others。所以,我的 key 的输出是:NY 1,或 Other 1。只有这两种情况。

我的 map 功能:

    #!/usr/bin/env python
    import sys
    import csv
    import string

    reader = csv.reader(sys.stdin, delimiter=',')
    for entry in reader:
        if len(entry) == 22:
            registration_state=entry[16]
            print('{0}\t{1}'.format(registration_state,int(1)))

现在我需要使用 reducer 来处理 map 输出。我的减少:
#!/usr/bin/env python
import sys
import string


currentkey = None
ny = 0
other = 0
# input comes from STDIN (stream data that goes to the program)
for line in sys.stdin:

    #Remove leading and trailing whitespace
    line = line.strip()

    #Get key/value 
    key, values = line.split('\t', 1)  
    values = int(values)
#If we are still on the same key...
    if key == 'NY':
        ny = ny + 1
    #Otherwise, if this is a new key...
    else:
        #If this is a new key and not the first key we've seen
        other = other + 1


#Compute/output result for the last key 
print('{0}\t{1}'.format('NY',ny))
print('{0}\t{1}'.format('Other',other))

根据这些,mapreduce 将给出两个输出结果文件,每个都包含 NY 和 Others 输出。即一个包含:NY 1248,Others 4677;另一个:NY 0,Others 1000。这是因为两个reduce split 从map 输出,所以生成了两个结果,通过组合(merge)最终输出将是结果。

但是,我想更改我的 reduce 或 map 函数,使每个 reduce 进程仅在一个键上,即一个 reduce 仅处理 NY 作为键值,而另一个在 Other 上工作。我希望得到像一个包含的结果:
NY 1258, Others 0; Another: NY 0, Others 5677. 

如何调整我的功能以达到我期望的结果?

最佳答案

可能您需要使用 Python 迭代器和生成器。
link 给出了一个很好的例子.我试过用相同的方式重写你的代码(未经测试)

映射器:

#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""

import sys

def main(separator='\t'):
    reader = csv.reader(sys.stdin, delimiter=',')
    for entry in reader:
    if len(entry) == 22:
        registration_state=entry[16]
        print '%s%s%d' % (registration_state, separator, 1)

if __name__ == "__main__":
    main()

reducer :
!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

关于python - 如何通过识别python Hadoop中的键来处理Mapreduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49085928/

相关文章:

Hadoop全分布式模式

python - xtensor 类型的性能与 NumPy 的简单归约

python - 扩展和合并 Pandas 数据框

python - 基于堆栈的修改前序树遍历

python - Django - 同一查询中的 objects.values() 和 prefetch_related()

mysql - 无法运行 PIG

hadoop - pig 中的xml解析

hadoop - Kibana、Logstash 大数据环境

hadoop - 如何使用 HIVE 对表进行分区?

hadoop - 如何将许多参数写入 reducer 的输出文件?