python - 如何使用 s3 对象名称作为 MRJob 映射器的输入,而不是 s3 对象本身?

标签 python mapreduce boto elastic-map-reduce mrjob

关于 Yelp 的 mrjob job,我遗漏了一些明显的东西图书馆。设置 MRJob 类非常简单。在 file 上运行它或标准输入也如此。但是,如何将作业的输入从本地或 s3 中的文件更改为 s3 存储桶中的键?

是这样的。假设我想计算 S3 存储桶中以字符串“foo”开头的所有对象:

import re

class MRCountS3Objects(MRJob):

    define mapper(self, _, botoS3Key):
        if re.match('^foo', botoS3Key.name):
            yield 'foo', 1

    define reduce(self, name, occurrences):
        yield name, sum(occurrences)

这是一个非常人为的例子,但你可能明白我的意思。我如何告诉 MRJob 对 s3 对象流进行操作,而忽略对象的内容?我看到了 S3Filesystem.get_s3_keys() method ,这让我得到了我需要的流,但我不确定从那里去哪里。

最佳答案

想出了至少一种方法来实现这一目标。您的 MRJob 有一个 stdin 属性,可以分配给任何迭代器,然后您可以以编程方式运行该作业。例如,这段代码应该处理 my-bucket 的键名:

from mrjob.job import MRJob
from mrjob.emr import EMRJobRunner

class MRS3KeyProcessor(MRJob):
    # Do some MRJob stuff.
    ...

def s3_name_generator(bucket):
    """Generator that returns boto.s3.Key names.
    """
    # Could also use raw boto here.
    emr = EMRJobRunner()
    key_stream = emr.fs.get_s3_keys(bucket)
    for key in key_stream:
        yield key.name

def main():
    # The '-' argument signifies that we use stdin.
    mr_job = MRCountS3Objects(['--runner', 'inline', '-'])
    stdin = s3_name_generator('my-bucket')
    mr_job.stdin = stdin
    results = []
    with mr_job.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            key, value = mr_job.parse_output_line(line)
            results.append((key, value))
    print(results)

if __name__ == '__main__':
    main()

关于python - 如何使用 s3 对象名称作为 MRJob 映射器的输入,而不是 s3 对象本身?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16598346/

相关文章:

python - 如何使用 Python 创建签名的云端 URL?

python - 需要一个条件语句来填充基于字符串的列

python - python-markdown 在不受信任的输入上安全吗?

hadoop - Map-reduce JobConf - 添加 FileInputFormat 时出错

hadoop - 使用outputcollector映射WordCount示例

python-2.7 - 如何使用 boto 使用 SPOT Block 启动 EMR?

python - AWS DynamoDB - 使用 JSON 文件作为输入使用 Boto3 加载数据

Python Pandas - Groupby 和 Mean,但保留列名称

python - 在 matplotlib 中设置轴

Hadoop - 当有 Spark 作业正在运行时,Sqoop 作业卡在已接受状态