我在同一个文件夹中的 hdfs 上有两个 zip 文件:/user/path-to-folder-with-zips/
。
我将其传递给 pyspark 中的“二进制文件”:
zips = sc.binaryFiles('/user/path-to-folder-with-zips/')
我正在尝试解压缩 zip 文件并对其中的文本文件执行一些操作,因此我试图只查看当我尝试处理 RDD 时的内容。我是这样做的:
zips_collected = zips.collect()
但是,当我这样做时,它给出了一个空列表:
>> zips_collected
[]
我知道 zips 不是空的——它们有文本文件。文档 here说
每个文件都作为单个记录读取并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
我在这里做错了什么?我知道我无法查看文件的内容,因为它是压缩的,因此是二进制的。但是,我至少应该能够看到一些东西。为什么它不返回任何东西?
每个 zip 文件可以有多个文件,但内容总是这样的:
rownum|data|data|data|data|data
rownum|data|data|data|data|data
rownum|data|data|data|data|data
最佳答案
我假设每个 zip 文件都包含一个文本文件(多个文本文件的代码很容易更改)。在逐行处理之前,您需要先通过 io.BytesIO
读取 zip 文件的内容。解决方案大致基于 https://stackoverflow.com/a/36511190/234233 .
import io
import gzip
def zip_extract(x):
"""Extract *.gz file in memory for Spark"""
file_obj = gzip.GzipFile(fileobj=io.BytesIO(x[1]), mode="r")
return file_obj.read()
zip_data = sc.binaryFiles('/user/path-to-folder-with-zips/*.zip')
results = zip_data.map(zip_extract) \
.flatMap(lambda zip_file: zip_file.split("\n")) \
.map(lambda line: parse_line(line))
.collect()
关于python - 为什么我在 pyspark 中收集它们时我的 `binaryFiles` 是空的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38256631/