scala - Spark- 写入 128 MB 大小的 Parquet 文件

标签 scala dataframe apache-spark apache-spark-sql

我有一个超过 10 亿行的 DataFrame (df)

df.coalesce(5)
.write
.partitionBy("Country", "Date")
.mode("append")
.parquet(datalake_output_path)

从上面的命令我了解到我的 100 个工作节点集群 (spark 2.4.5) 中只有 5 个工作节点将执行所有任务。使用 coalesce(5) 需要 7 个小时才能完成。

我应该尝试 repartition 而不是 coalesce 吗?

是否有更快/更有效的方法来写出 128 MB 大小的 Parquet 文件,或者我是否需要先计算数据帧的大小以确定需要多少个分区。

例如,如果我的数据帧大小为 1 GB 并且 spark.sql.files.maxPartitionBytes = 128MB,我应该首先计算 否。所需的分区数为 1 GB/128 MB = 大约(8) 然后执行 repartition(8) 或 coalesce(8)?

想法是在编写时最大化输出中 parquet 文件的大小,并且能够快速(更快)完成此操作。

最佳答案

您可以获取数据帧 df 的大小 (dfSizeDiskMB),方法是持久化数据帧,然后检查 Web UI 上的“存储”选项卡,如 answer 所示。 .有了这些信息和对预期 Parquet 压缩率的估计,您就可以估计达到所需输出文件分区大小所需的分区数,例如

val targetOutputPartitionSizeMB = 128
val parquetCompressionRation = 0.1
val numOutputPartitions = dfSizeDiskMB * parquetCompressionRatio / targetOutputPartitionSizeMB
df.coalesce(numOutputPartitions).write.parquet(path)

请注意 spark.files.maxPartitionBytes在这里不相关,因为它是:

The maximum number of bytes to pack into a single partition when reading files.

(除非 df 是读取输入数据源的直接结果,没有创建中间数据帧。更有可能的是 df 的分区数由 决定spark.sql.shuffle.partitions,是 Spark 用于从连接和聚合创建的数据帧的分区数。

Should I try repartition instead of coalesce?

coalesce 通常更好,因为它可以避免与 repartition 相关的洗牌,但请注意 docs 中的警告根据您的用例,可能会在上游阶段失去并行性。

关于scala - Spark- 写入 128 MB 大小的 Parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67404137/

相关文章:

scala - 带彩色复制品的 Spark 壳

scala - Scala模式与集合匹配

python-3.x - 将数据框“扩展”为矩阵索引

scala - Spark::KMeans 调用 takeSample() 两次?

json - 使用 Circe 在 Scala 中将 null 值映射到 None

scala - 玩!从单独的 mongo 集合中选择项目的表单

pandas - 我如何使用替换然后在 Pandas 中创建新行?

python - Pandas 熔化函数使用列索引位置而不是列名称

apache-spark - 了解 Spark RandomForest 特征重要性结果

apache-spark - 使用 sparkmeasure 时无法调用 Java 对象