我有一个非常简单的 pyspark 程序,它应该从 S3 读取 CSV 文件:
r = sc.textFile('s3a://some-bucket/some-file.csv')
.map(etc... you know the drill...)
运行本地 Spark 节点时失败(在 EMR 中有效)。我收到 OOM 错误和 GC 崩溃。进一步检查后,我意识到分区的数量高得离谱。在这种特殊情况下,r.getNumPartitions()
将返回 2358041
。
我意识到这正是我的文件大小(以字节为单位)。这当然会让 Spark 崩溃得很惨。
我尝试了不同的配置,比如更改 mapred.min.split.size
:
conf = SparkConf()
conf.setAppName('iRank {}'.format(datetime.now()))
conf.set("mapred.min.split.size", "536870912")
conf.set("mapred.max.split.size", "536870912")
conf.set("mapreduce.input.fileinputformat.split.minsize", "536870912")
我也尝试过使用 repartition
或更改将分区参数传递给 textFile
,但无济于事。
我很想知道是什么让 Spark 认为从文件大小推导出分区数是个好主意。
最佳答案
一般情况下不会。正如 eliasah 很好地解释的那样在 his answer至 Spark RDD default number of partitions它使用 max
的 minPartitions
(如果未提供则为 2)并根据 Hadoop 输入格式计算拆分。
只有在配置指示的情况下,后者才会高得离谱。这表明某些配置文件干扰了您的程序。
您的代码可能存在的问题是您使用了错误的配置。 Hadoop 选项应该使用 hadoopConfiguration
而不是 Spark 配置来设置。看起来您使用的是 Python,因此您必须使用私有(private) JavaSparkContext
实例:
sc = ... # type: SparkContext
sc._jsc.hadoopConfiguration().setInt("mapred.min.split.size", min_value)
sc._jsc.hadoopConfiguration().setInt("mapred.max.split.size", max_value)
关于hadoop - 为什么 Spark 将分区设置为以字节为单位的文件大小?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47318260/