apache-spark - 计算Spark数据帧的大小-SizeEstimator提供意外结果

标签 apache-spark spark-dataframe

我正在尝试找到一种可靠的方式来以编程方式计算Spark数据帧的大小(以字节为单位)。

原因是我希望有一种方法来计算“最佳”数量的分区(“最佳”在这里可能表示不同的意思:在写入Parquet表时可能表示having an optimal partition sizeresulting in an optimal file size,但都可以假定两者都是数据框大小的一些线性函数)。换句话说,我想在数据帧上调用coalesce(n)repartition(n),其中n不是固定数字,而是数据帧大小的函数。

SO上的其他主题建议使用SizeEstimator.estimate中的org.apache.spark.util来获取数据帧的字节大小,但是我得到的结果不一致。

首先,我将数据帧保存到内存中:

df.cache().count 

Spark UI在“存储”选项卡中显示为4.8GB。然后,我运行以下命令从SizeEstimator获取大小:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)

结果为115'715'808字节=〜116MB。但是,将SizeEstimator应用于不同的对象会导致非常不同的结果。例如,我尝试分别计算数据帧中每一行的大小并将其求和:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)

这导致大小为12'084'698'256字节=〜12GB。或者,我可以尝试将SizeEstimator应用于每个分区:
df.mapPartitions(
    iterator => Seq(SizeEstimator.estimate(
        iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)

这再次导致10'792'965'376字节=〜10.8GB的不同大小。

我了解其中涉及内存优化/内存开销,但是在执行这些测试之后,我看不到SizeEstimator如何可用于获得足够好的数据帧大小(进而是分区大小或生成的Parquet文件大小)的估计值。

为了获得对数据帧大小或其分区的良好估计,应用SizeEstimator的适当方法是什么(如果有)?如果没有,这里建议的方法是什么?

最佳答案

不幸的是,我无法从SizeEstimator获得可靠的估计,但是我可以找到另一种策略-如果数据帧已缓存,我们可以从queryExecution中提取其大小,如下所示:

df.cache.foreach(_=>_)
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
    catalyst_plan).optimizedPlan.stats.sizeInBytes

对于示例数据帧,这恰好提供了4.8GB(这也对应于写入未压缩的Parquet表时的文件大小)。

这样做的缺点是需要缓存数据帧,但就我而言,这不是问题。

关于apache-spark - 计算Spark数据帧的大小-SizeEstimator提供意外结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49492463/

相关文章:

scala - Spark UDAF 与 ArrayType 作为 bufferSchema 性能问题

azure - 如何在 ADF 中运行 Spark 作业?

scala - 更改Spark数据框中的列的可为空属性

scala - 如何使用 saveAsTextFile 在 spark 数据框中进行自定义分区

apache-spark - 禁用 Spark 催化剂优化器

apache-spark - 无法在 Spark 应用程序中创建新的 native 线程

amazon-ec2 - 如何在 Spark Streaming EC2 集群应用程序中从 S3 读取输入

scala - 使用约束将 Apache Spark (Scala) 数据框中的 bool 列转换为数值列?

scala - 在spark Scala的新行中添加两个日期之间的所有日期(周)

python - 如何使用 PySpark 加载 IPython shell