python - 如何让 Reducer 根据键类型发出

标签 python hadoop mapreduce reduce

作为 this 的后续行动问题,我有一个 Mapper,它正在处理大量数据并将 ID 号作为值为 1 的键发出。每个键都有两个部分,由竖线分隔符分隔,例如:

映射器发出:
a|abc 1
b|efg 1
a|cba 1
a|abc 1
b|dhh 1
b|dhh 1

我想做的是让 Reducer 解析键,对于类型为“a”的每个键,即“a|abc”,我希望 Reducer 只发出重复项,但对于所有其他类型(例如键入 'b',即 'b|abc'),我希望 Reducer 发出所有内容,即使值仅为 1。

所以上面的数据会产生:
a|abc 2
b|efg 1
b|dhh 2

在这种情况下,不会发出 'a|cba 1',因为它是 'a' 类型的键并且没有重复项。下面是我尝试过的代码,它几乎按预期工作,除了我得到 92 个额外的发射,其中键的类型为“a”且计数为 1。注意:根据我的 MapReduce 日志,92 是 Reduce 任务的数量.

由于我只想要键类型“a”的重复项,我该如何修复 Reducer,这样我就不会获得值为 1 的键类型“a”的额外 92 次发射?

import sys
import codecs

sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)

(last_key, tot_cnt) = (None, 0)
for line in inData:
    (key, val) = line.strip().split("\t")
    if last_key != key:
        k = key.split('|')
        v_id = k[0]
        if v_id == 'a':
            if tot_cnt > 1:
                sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
        else:
            sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))

        (last_key, tot_cnt) = (key, int(val))
    else:
        (last_key, tot_cnt) = (key, tot_cnt + int(val))
if last_key:
    if v_id == 'a':
        if tot_cnt > 1:
            sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
    else:
        sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))

最佳答案

以下是您的代码中的错误:

  1. 在全局级别声明v_id,以便它随处可见。

    改变这一行:

    (last_key, tot_cnt) = (None, 0)
    

    收件人:

    (last_key, tot_cnt, v_id) = (None, 0, None)
    
  2. 后续拆分应该在 last_key 而不是当前 key 上。当当前键是“b|dhh”而最后一个键是“a|abc”时,你应该得到“a|abc”的v_id

    更改此代码:

    if last_key != key:
        k = key.split('|')
        v_id = k[0]
    if v_id == 'a':
        if tot_cnt > 1:
            sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
    else:
        sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
    

    收件人:

    if last_key != key:
        if last_key != None:
            k = last_key.split('|')
            v_id = k[0]
    
            if v_id == 'a':
                if tot_cnt > 1:
                    sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
            else:
                sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
    

因此,修改后的 reducer 代码如下所示:

import sys
import codecs

sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)

(last_key, tot_cnt, v_id) = (None, 0, None)

for line in inData:
    (key, val) = line.strip().split("\t")
    if last_key != key:
        if last_key != None:
            k = last_key.split('|')
            v_id = k[0]

            if v_id == 'a':
                if tot_cnt > 1:
                    sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
            else:
                sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))

        (last_key, tot_cnt) = (key, int(val))
    else:
        (last_key, tot_cnt) = (key, tot_cnt + int(val))

if last_key:
    if v_id == 'a':
        if tot_cnt > 1:
            sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt)) 
    else:
        sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))

当我运行它时,我得到了输出:

a|abc   2
b|dhh   2
b|efg   1

注意:我不是 Python 专家。我觉得,你可以优化这段代码。因此,请检查脚本中是否存在任何极端情况和冗余检查。

关于python - 如何让 Reducer 根据键类型发出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34275074/

相关文章:

hadoop - 在 Apache Pig 中并行下载文件列表

python - 如果将每个 View 类移动到单独的 .py 文件,是否会影响性能?

Python 请求错误 400 浏览器发送无效请求

java - 从 Hadoop 中的 HDFS 读取时的 I/O 时间

java - Hadoop mapreduce wordcount程序失败,退出代码为9009

python - 访问来自MRjob的hdfs的流输出

python - 如何使算法更快?

python - 如何使用 python/matplotlib 为 3d 绘图设置 "camera position"?

azure - 如何为 Premium HDInsight 创建反向 DNS 查找区域?

azure - HBase 到 Delta 表