Python从HDFS读取文件作为流

标签 python hadoop subprocess hdfs

这是我的问题:我在 HDFS 中有一个文件,它可能很大(=不足以容纳所有内存)

我想做的是避免将此文件缓存在内存中,而只像处理常规文件一样逐行处理它:

for line in open("myfile", "r"):
    # do some processing

我正在寻找是否有一种简单的方法可以在不使用外部库的情况下正确完成这项工作。我可能可以使它与 libpyhdfs 一起工作或 python-hdfs但如果可能的话,我希望避免在系统中引入新的依赖项和未经测试的库,特别是因为这两者似乎都没有得到大量维护,并且声明它们不应该在生产中使用。

我正在考虑使用标准的“hadoop”命令行工具使用 Python subprocess 模块来执行此操作,但由于没有命令,我似乎无法执行我需要的操作可以执行我的处理的线工具,我想以流方式为每条线执行 Python 函数。

有没有办法使用 subprocess 模块将 Python 函数应用为管道的正确操作数?或者更好的是,像文件一样打开它作为生成器,这样我就可以轻松处理每一行?

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)

如果有另一种方法可以在不使用外部库的情况下实现我上面描述的内容,我也很开放。

感谢您的帮助!

最佳答案

你想要xreadlines ,它从文件中读取行而不将整个文件加载到内存中。

编辑:

现在我看到了你的问题,你只需要从 Popen 对象中获取标准输出管道:

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in cat.stdout:
    print line

关于Python从HDFS读取文件作为流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12485718/

相关文章:

python - 零经验者的爱丽丝 vs Python

python - 为什么使用 python F 字符串插值用引号引起来?

java - HTable(config,tablename) 类型已弃用。代替有什么用?

python - 使用 python 获取正在运行的 Windows 应用程序列表

Python:响应命令行提示

python - 当我使用 django 运行syncdb 时,它是否指定 mysql 的排序规则设置?

python - Python 生成器中的反向 next()

memory - Hadoop Namenode 元数据 - fsimage 和编辑日志

scala - 下面的热烫预处理和后处理将在哪个hadoop节点上运行?

python - 测试 Popen 进程是否正在等待输入