python - 声明 mrjob 映射器而不忽略 key

标签 python hadoop mapreduce mrjob

我想用 mrjob 声明一个映射器函数。因为我的映射器函数需要引用一些常量来进行一些计算,所以我决定将这些常量放入映射器中的 Key 中(还有其他方法吗?)。我在 this site 上阅读了 mrjob 教程但所有的例子都忽略了这个键。例如:

class MRWordFrequencyCount(MRJob):

def mapper(self, _, line):
    yield "chars", len(line)
    yield "words", len(line.split())
    yield "lines", 1

def reducer(self, key, values):
    yield key, sum(values)

基本上,我想要这样的东西:

def mapper(self, (constant1,constant2,constant3,constant4,constant5), line):
    My calculation goes here

请建议我该怎么做。谢谢

最佳答案

您可以在 __init__ 中设置常量

from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

    def mapper(self, key, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
        yield "Constant",self.constant

    def reducer(self, key, values):
        yield key, sum(values)

    def __init__(self,*args,**kwargs):
        super(MRWordFrequencyCount, self).__init__(*args, **kwargs)
        self.constant = 10


if __name__ == '__main__':
    MRWordFrequencyCount.run()

输出:

"Constant"  10
"chars" 12
"lines" 1
"words" 2

或者,您可以使用RawProtocol

from mrjob.job import MRJob
import mrjob


class MRWordFrequencyCount(MRJob):
    INPUT_PROTOCOL = mrjob.protocol.RawProtocol

    def mapper(self, key, line):
        yield "constant", key
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        if str(key) != "constant":
            yield key, sum(values)
        else:
            yield "constant",list(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

如果输入是:

constant1,constant2,constant3   The quick brown fox jumps over the lazy dog

输出:

"chars" 43
"constant"  ["constant1,constant2,constant3"]
"lines" 1
"words" 9

关于python - 声明 mrjob 映射器而不忽略 key,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33745977/

相关文章:

database - Hadoop/Hive 查询将一列拆分为几列

shell - 将配置单元查询输出写入HDFS文件

hadoop - Apache 纳奇 2.3 : throwing Error Failed with exit value 255

xml - 无法启动MapReduce任务。找不到或加载主类org.apache.hadoop.mapreduce.v2.app.MRAppMaster

scala - Spark RDD 按键查找

python - 如何在 python 中编辑请求以添加 TLS 设置?

python - 标准库中某处是否有 ANSI 颜色转义码列表?

python - 为什么python允许列表[a :b] but not list[a] if a and b are out of index range?

python - 在 python 3.5 中构建位字符串时出错 : the datatype is being set to U32 without my control

hadoop - 启动的 reduce task 和减少调用函数的次数之间的区别?