python - MRJob 的多输入

标签 python mapreduce mrjob

我正在尝试学习将 Yelp 的 Python API 用于 MapReduce、MRJob。他们的简单单词计数器示例很有意义,但我很好奇人们将如何处理涉及多个输入的应用程序。例如,不是简单地计算文档中的单词,而是将向量乘以矩阵。我想出了这个解决方案,它起作用了,但感觉很傻:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

此代码运行./matrix.py < input.txt它起作用的原因是矩阵按列存储在 input.txt 中,相应的向量值位于行尾。

所以,下面的矩阵和向量:

enter image description here

表示为 input.txt 为:

enter image description here

简而言之,我将如何更自然地将矩阵和向量存储在单独的文件中并将它们都传递到 MRJob 中?

最佳答案

如果您需要针对另一个(或相同的 row_i、row_j)数据集处理原始数据,您可以:

1) 创建一个 S3 存储桶来存储您的数据副本。将此副本的位置传递给您的任务类,例如下面代码中的 self.options.bucket 和 self.options.my_datafile_copy_location。警告:不幸的是,似乎整个文件必须在处理之前“下载”到任务机器。如果连接不稳定或加载时间过长,则此作业可能会失败。这是执行此操作的一些 Python/MRJob 代码。

将其放入您的映射器函数中:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2) 创建一个 SimpleDB 域,并将所有数据存储在其中。 在 boto 和 SimpleDB 上阅读这里: http://code.google.com/p/boto/wiki/SimpleDbIntro

您的映射器代码如下所示:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

如果您有大量数据,第二个选项可能会执行得更好,因为它可以对每一行数据而不是一次请求全部数据。请记住,SimpleDB 值最多只能有 1024 个字符长,因此如果您的数据值超过此长度,您可能需要通过某种方法进行压缩/解压缩。

关于python - MRJob 的多输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9302580/

相关文章:

python - tkinter PhotoImage 不存在?

python - 如何在 Flask 中按下按钮时获取 HTML <tr> 标签?

hadoop负载均衡

azure - HDInsight字数映射减少卡住映射器的程序的程序100%和缩减器0%

使用 mrjob subprocess.CalledProcessError 的 Python hadoop mapreduce 作业

java - 每个映射器的多个输入文件 'type'

javascript - Jinja2 : call function on click

Python Mechanize Browser.links() - 奇怪的行为

java - 包装类型在Hadoop中如何工作?

python - 有没有办法在 Hadoop 管理 Web 界面中指定 mrjob 的作业标题?