我有一个相当简单的工作,将日志文件转换为 Parquet 。它正在处理 1.1TB 的数据(分成 64MB - 128MB 的文件 - 我们的块大小是 128MB),大约有 12000 个文件。
工作如下:
val events = spark.sparkContext
.textFile(s"$stream/$sourcetype")
.map(_.split(" \\|\\| ").toList)
.collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
.toDF()
df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")
它使用通用模式收集事件,转换为 DataFrame,然后作为 parquet 写出。
我遇到的问题是这会在 HDFS 集群上造成一些 IO 爆炸,因为它试图创建如此多的小文件。
理想情况下,我只想在分区“日期”内创建少数 Parquet 文件。
控制这种情况的最佳方法是什么?是通过使用“coalesce()”吗?
这将如何影响在给定分区中创建的文件数量?它是否取决于我在 Spark 中工作的执行者数量? (目前设置为 100)。
最佳答案
你必须重新分配你的 DataFrame
匹配 DataFrameWriter
的分区
尝试这个:
df
.repartition($"date")
.write.mode(SaveMode.Append)
.partitionBy("date")
.parquet(s"$path")
关于scala - Spark dataframe写方法写很多小文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44459355/