python - 使用 Spark 压缩文件

标签 python gzip apache-spark pyspark

我有一个 Spark 作业,它将数千个文件作为输入并从 Amazon S3 下载它们并在映射阶段处理它们,其中每个映射步骤都会返回一个字符串。我想将输出压缩到 .tar.gz 文件,然后将其上传到 S3。一种方法是

outputs = sc.map(filenames).collect()
for output in outputs:
    with tempfile.NamedTemporaryFile() as tar_temp:
        tar = tarfile.open(tar_temp.name, "w:gz")
        for output in outputs:
            with tempfile.NamedTemporaryFile() as output_temp:
                output_temp.write(output)
                tar.add(output_temp.name)
        tar.close()

问题是输出不适合内存(但它们适合磁盘)。有没有办法在映射阶段将输出保存到主文件系统?或者使用循环 for output in outputs 作为生成器,这样我就不必将所有内容加载到内存中?

最佳答案

在 Spark 1.3.0 中,您将能够在 Python 中使用相同的 Java/Scala 方法 toLocalIterator

拉取请求已合并:https://github.com/apache/spark/pull/4237

这是指定的文档:

    """
    Return an iterator that contains all of the elements in this RDD.
    The iterator will consume as much memory as the largest partition in this RDD.
    >>> rdd = sc.parallelize(range(10))
    >>> [x for x in rdd.toLocalIterator()]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    """

总而言之,它将允许您迭代输出,而无需将所有内容收集到驱动程序。

问候,

关于python - 使用 Spark 压缩文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27767942/

相关文章:

apache-spark - 如何使用支持通用 ID 类型(int 和 long)的 Spark ML ALS 实现

apache-spark - Apache Spark 可以用作数据库替代品吗? (例如替换Mysql)

在命令行上运行Python脚本找不到站点包

python - 随机获取 PyTorch 张量中最大值之一的索引

python - 为什么 python openCV 没有按照我期望的方式改变颜色?

Java从json属性解压缩HTTP GZIP内容

python - Django 管理编码错误

javascript - 如何在 javascript 或 jquery 中转换图像字符串?

c# - 为什么我的 GZipStream 不可写?

python - Pyspark S3 错误 : java. lang.NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException