python - 如何使用 python 在 hadoop 流作业中使用文件?

标签 python hadoop hadoop-streaming

我想从我的 hadoop 流作业中的文件中读取列表。 这是我的简单 mapper.py:

#!/usr/bin/env python

import sys
import json

def read_file():
    id_list = []
    #read ids from a file
    f = open('../user_ids','r')
    for line in f:
        line = line.strip()
        id_list.append(line)
    return id_list

if __name__ == '__main__':
    id_list = set(read_file())
    # input comes from STDIN (standard input)
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
        line = json.loads(line)
        user_id = line['user']['id']
        if str(user_id) in id_list:
            print '%s\t%s' % (user_id, line)

这是我的 reducer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

current_id = None
current_list = []
id = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    id, line = line.split('\t', 1)

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_id == id:
        current_list.append(line)
    else:
        if current_id:
            # write result to STDOUT
            print '%s\t%s' % (current_id, current_list)
        current_id = id
        current_list = [line]

# do not forget to output the last word if needed!
if current_id == id:
        print '%s\t%s' % (current_id, current_list)

现在运行它,我说:

hadoop jar contrib/streaming/hadoop-streaming-1.1.1.jar -file ./mapper.py \
    -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py \
    -input test/input.txt  -output test/output -file '../user_ids' 

作业开始运行:

13/11/07 05:04:52 INFO streaming.StreamJob:  map 0%  reduce 0%
13/11/07 05:05:21 INFO streaming.StreamJob:  map 100%  reduce 100%
13/11/07 05:05:21 INFO streaming.StreamJob: To kill this job, run:

我得到错误:

job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1.         LastFailedTask: task_201309172143_1390_m_000001
13/11/07 05:05:21 INFO streaming.StreamJob: killJob...

当我不从文件 ../user_ids 中读取 ID 时,它不会给我任何错误。我认为问题是找不到我的 ../user_id 文件。我也使用了 hdfs 中的位置,但仍然没有用。感谢您的帮助。

最佳答案

hadoop jar contrib/streaming/hadoop-streaming-1.1.1.jar -file ./mapper.py \
  -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py \
  -input test/input.txt  -output test/output -file '../user_ids'

执行作业时,../user_ids 是否存在于您的本地文件路径中?如果是这样,那么您需要修改您的映射器代码以说明该文件将在运行时在映射器的本地工作目录中可用:

f = open('user_ids','r')

关于python - 如何使用 python 在 hadoop 流作业中使用文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19833722/

相关文章:

python - 用于 Python 的 OpenCL

python - 我可以在 pyspark mapPartitions 中使用多线程吗?

python - Spark 缓存和取消持久化订单

hadoop - 如何配置Pivotal Hadoop

Hadoop 流式处理使用 shell 脚本 : reducer fails with error : No such file or directory

python - Django 注释外键的外键计数

python - 在python中将数组存储到持久内存的有效方法

hadoop - sqoop导入数据到hive

hadoop - 如果我将相同的路径添加到 Hadoop 两次会发生什么?

xml - XML文件输入映射/减少Hadoop Windows Server