python - 为什么我在 pyspark 中收集它们时我的 `binaryFiles` 是空的?

标签 python hadoop zip pyspark binaryfiles

我在同一个文件夹中的 hdfs 上有两个 zip 文件:/user/path-to-folder-with-zips/

我将其传递给 pyspark 中的“二进制文件”:

zips = sc.binaryFiles('/user/path-to-folder-with-zips/')

我正在尝试解压缩 zip 文件并对其中的文本文件执行一些操作,因此我试图只查看当我尝试处理 RDD 时的内容。我是这样做的:

zips_collected = zips.collect()

但是,当我这样做时,它给出了一个空列表:

>> zips_collected
[]

我知道 zips 不是空的——它们有文本文件。文档 here

每个文件都作为单个记录读取并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。

我在这里做错了什么?我知道我无法查看文件的内容,因为它是压缩的,因此是二进制的。但是,我至少应该能够看到一些东西。为什么它不返回任何东西?

每个 zip 文件可以有多个文件,但内容总是这样的:

rownum|data|data|data|data|data
rownum|data|data|data|data|data
rownum|data|data|data|data|data

最佳答案

我假设每个 zip 文件都包含一个文本文件(多个文本文件的代码很容易更改)。在逐行处理之前,您需要先通过 io.BytesIO 读取 zip 文件的内容。解决方案大致基于 https://stackoverflow.com/a/36511190/234233 .

import io
import gzip

def zip_extract(x):
    """Extract *.gz file in memory for Spark"""
    file_obj = gzip.GzipFile(fileobj=io.BytesIO(x[1]), mode="r")
    return file_obj.read()

zip_data = sc.binaryFiles('/user/path-to-folder-with-zips/*.zip')
results = zip_data.map(zip_extract) \
                  .flatMap(lambda zip_file: zip_file.split("\n")) \
                  .map(lambda line: parse_line(line))
                  .collect()

关于python - 为什么我在 pyspark 中收集它们时我的 `binaryFiles` 是空的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38256631/

相关文章:

windows - 如何创建 .BAT 文件以下载和解压缩 zip 文件?

Java - 从网站压缩文件?

python - 是否可以将比较运算符作为参数传递给函数?

python - 返回或中断作为 python 模拟中的副作用?

python - 如何避免从 os.walk 获取重复路径

python - libnet 创建带有无效校验和的 UDP 数据包

scala - 如何使用 Scala 计算 Hbase 表上的所有行

hadoop - Pig Latin 中不区分大小写的搜索

sql - “Hive” 多列的最大列值

angularjs - 将 zip 文件从 Node api 流式传输到 Angular