apache-spark - 使用 saveAsTextFile 时,为什么在 Google Dataproc 中运行的 Spark 将临时文件存储在外部存储 (GCS) 而不是本地磁盘或 HDFS 上?

标签 apache-spark pyspark google-cloud-dataproc

我运行了以下 PySpark 代码:

from pyspark import SparkContext

sc = SparkContext()

data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.saveAsTextFile(
    'gs://bucket-name/output_blob_path',
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)

作业成功完成。但是,在作业执行期间,Spark 在以下路径 gs://bucket-name/output_blob_path/_temporary/0/中创建了许多临时 blob。我意识到最后删除所有这些临时 blob 需要一半的作业执行时间,并且在此期间 CPU 利用率为 1%(资源的巨大浪费)。

有没有办法将临时文件存储在本地驱动器(或 HDFS)而不是 GCP 上?我仍然想将最终结果(排序数据集)保留到 GCP。

我们使用具有 10 个工作节点的 Dataproc Spark 集群(VM 类型 16 核、60GM)。输入数据量为10TB。

最佳答案

您看到的_temporary 文件很可能是FileOutputCommitter 的产物。在引擎盖下使用。重要的是,这些临时 blob 并不是严格意义上的“临时”数据,而实际上是已完成的输出数据,仅在作业完成时“重命名”到最终目的地。通过重命名“提交”这些文件实际上很快,因为源和目标都在 GCS 上;因此,无法将工作流程的这一部分替换为将临时文件放在 HDFS 上,然后“提交”到 GCS,因为提交将需要重新将整个输出数据集从 HDFS 返回到 GCS。具体来说,底层 Hadoop FileOutputFormat 类不支持这种习惯用法。

GCS 本身并不是一个真正的文件系统,而是一个“对象存储”,Dataproc 内部的 GCS 连接器只是尽力模仿 HDFS。一个后果是,删除充满文件的目录实际上需要 GCS 删除幕后的各个对象,而不是真正的文件系统只是取消链接 inode。

实际上,如果您遇到此问题,则可能意味着您的输出无论如何都被拆分为太多文件,因为清理确实一次以大约 1000 个文件的批处理进行。因此,多达数万个输出文件通常不会明显变慢。文件太多也会使以后处理这些文件的速度变慢。最简单的修复通常是尽可能减少输出文件的数量,例如使用 repartition():

from pyspark import SparkContext

sc = SparkContext()

data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.repartition(1000).saveAsTextFile(
    'gs://bucket-name/output_blob_path',
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)

关于apache-spark - 使用 saveAsTextFile 时,为什么在 Google Dataproc 中运行的 Spark 将临时文件存储在外部存储 (GCS) 而不是本地磁盘或 HDFS 上?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41170932/

相关文章:

json - 数据帧Spark Scala爆炸了JSON数组

apache-spark - Pyspark - 如何将 Parquet 文件转换为带有分隔符的文本文件

python - 如何在 Spark 数据框中强制重新分区?

xml - pretty-print - Spark/Scala 中的 XML 单记录

python - 有什么方法可以在 Spark Dataframe 的组数据上运行 stat 函数交叉表?

scala - 如何获取上传文件的路径

google-cloud-dataproc - 无法参数化 placement.managedCluster.config 下的任何值

google-cloud-dataproc - 提供自定义 UUID 以通过 Airflow DataprocSubmitJobOperator 启 Action 业

java - 在 Apache Spark SQL 中对多行进行操作

apache-spark - 具有多个执行程序的Spark独立配置