我对 Hadoop 和 MapReduce 完全陌生,正在尝试解决它。 我正在尝试用 python 开发一个 mapreduce 应用程序,其中使用 2 个 .CSV 文件中的数据。我只是在映射器中读取两个文件,然后将文件中的键值对打印到 sys.stdout
当我在单机上使用该程序时,该程序运行良好,但使用 Hadoop Streaming 时,我收到错误。我认为我在 Hadoop 上的映射器中读取文件时犯了一些错误。请帮助我编写代码,并告诉我如何在 Hadoop Streaming 中使用文件处理。 mapper.py代码如下。 (可以从注释中了解代码):
#!/usr/bin/env python
import sys
from numpy import genfromtxt
def read_input(inVal):
for line in inVal:
# split the line into words
yield line.strip()
def main(separator='\t'):
# input comes from STDIN (standard input)
labels=[]
data=[]
incoming = read_input(sys.stdin)
for vals in incoming:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited;
if len(vals) > 10:
data.append(vals)
else:
labels.append(vals)
for i in range(0,len(labels)):
print "%s%s%s\n" % (labels[i], separator, data[i])
if __name__ == "__main__":
main()
有 60000 条记录从两个 .csv 文件输入到此映射器,如下所示(在单机上,而不是 hadoop 集群):
cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py
最佳答案
我花了三天的时间寻找解决方案后才解决了这个问题。
问题出在较新版本的 Hadoop(我的例子中是 2.2.0)。当从文件中读取值时,映射器代码在某个时刻给出非零的退出代码(可能是因为它一次读取一个巨大的值列表(784))。 Hadoop 2.2.0 中有一个设置,它告诉 Hadoop 系统给出一般错误(子进程失败,代码为 1)。该设置默认设置为 True。我只需将此属性的值设置为 False,它就使我的代码运行时没有任何错误。
设置为:stream.non.zero.exit.is.failure。流式传输时只需将其设置为 false 即可。所以流命令有点像:
**hadoop jar ... -D stream.non.zero.exit.is.failure=false ...**
希望它能帮助别人,并节省 3 天的时间...;)
关于python - 通过 Python 使用 Hadoop Streaming 中的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23038720/