我有一个超过 10 亿行的 DataFrame (df)
df.coalesce(5)
.write
.partitionBy("Country", "Date")
.mode("append")
.parquet(datalake_output_path)
从上面的命令我了解到我的 100 个工作节点集群 (spark 2.4.5) 中只有 5 个工作节点将执行所有任务。使用 coalesce(5) 需要 7 个小时才能完成。
我应该尝试 repartition
而不是 coalesce
吗?
是否有更快/更有效的方法来写出 128 MB 大小的 Parquet 文件,或者我是否需要先计算数据帧的大小以确定需要多少个分区。
例如,如果我的数据帧大小为 1 GB 并且 spark.sql.files.maxPartitionBytes = 128MB,我应该首先计算 否。所需的分区数为 1 GB/128 MB = 大约(8)
然后执行 repartition(8) 或 coalesce(8)?
想法是在编写时最大化输出中 parquet 文件的大小,并且能够快速(更快)完成此操作。
最佳答案
您可以获取数据帧 df
的大小 (dfSizeDiskMB
),方法是持久化数据帧,然后检查 Web UI 上的“存储”选项卡,如 answer 所示。 .有了这些信息和对预期 Parquet 压缩率的估计,您就可以估计达到所需输出文件分区大小所需的分区数,例如
val targetOutputPartitionSizeMB = 128
val parquetCompressionRation = 0.1
val numOutputPartitions = dfSizeDiskMB * parquetCompressionRatio / targetOutputPartitionSizeMB
df.coalesce(numOutputPartitions).write.parquet(path)
请注意 spark.files.maxPartitionBytes
在这里不相关,因为它是:
The maximum number of bytes to pack into a single partition when reading files.
(除非 df
是读取输入数据源的直接结果,没有创建中间数据帧。更有可能的是 df
的分区数由 决定spark.sql.shuffle.partitions
,是 Spark 用于从连接和聚合创建的数据帧的分区数。
Should I try repartition instead of coalesce?
coalesce
通常更好,因为它可以避免与 repartition
相关的洗牌,但请注意 docs 中的警告根据您的用例,可能会在上游阶段失去并行性。
关于scala - Spark- 写入 128 MB 大小的 Parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67404137/