hadoop - 为什么 Spark 将分区设置为以字节为单位的文件大小?

标签 hadoop apache-spark

我有一个非常简单的 pyspark 程序,它应该从 S3 读取 CSV 文件:

r = sc.textFile('s3a://some-bucket/some-file.csv')
  .map(etc... you know the drill...)

运行本地 Spark 节点时失败(在 EMR 中有效)。我收到 OOM 错误和 GC 崩溃。进一步检查后,我意识到分区的数量高得离谱。在这种特殊情况下,r.getNumPartitions() 将返回 2358041

我意识到这正是我的文件大小(以字节为单位)。这当然会让 Spark 崩溃得很惨。

我尝试了不同的配置,比如更改 mapred.min.split.size:

conf = SparkConf()
conf.setAppName('iRank {}'.format(datetime.now()))
conf.set("mapred.min.split.size", "536870912")
conf.set("mapred.max.split.size", "536870912")
conf.set("mapreduce.input.fileinputformat.split.minsize", "536870912")

我也尝试过使用 repartition 或更改将分区参数传递给 textFile,但无济于事。

我很想知道是什么让 Spark 认为从文件大小推导出分区数是个好主意。

最佳答案

一般情况下不会。正如 eliasah 很好地解释的那样在 his answerSpark RDD default number of partitions它使用 maxminPartitions(如果未提供则为 2)并根据 Hadoop 输入格式计算拆分。

只有在配置指示的情况下,后者才会高得离谱。这表明某些配置文件干扰了您的程序。

您的代码可能存在的问题是您使用了错误的配置。 Hadoop 选项应该使用 hadoopConfiguration 而不是 Spark 配置来设置。看起来您使用的是 Python,因此您必须使用私有(private) JavaSparkContext 实例:

sc = ...  # type: SparkContext

sc._jsc.hadoopConfiguration().setInt("mapred.min.split.size", min_value)
sc._jsc.hadoopConfiguration().setInt("mapred.max.split.size", max_value)

关于hadoop - 为什么 Spark 将分区设置为以字节为单位的文件大小?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47318260/

相关文章:

hadoop webhdfs 创建。我无法传输文件

hadoop - 在Windows中通过cmd运行Hadoop

apache-spark - 收集 Spark 作业运行统计信息并将其保存到数据库的最佳方法是什么

apache-spark - Spark 中的沿袭是什么?

python - pyspark 每列上有不同的计数

bash - 在安装hadoop时,当我运行start-dfs.sh命令时,它显示 'no such file or directory found'

hadoop - 如何正确控制YARN容器分配增量?

hadoop - Riemann Context for Hadoop 使用 metrics2 接口(interface)向 Riemann 发送指标

scala - Apache-Spark : What is map(_. _2) 的简写?

python - Spark : More Efficient Aggregation to join strings from different rows