scala - Spark dataframe写方法写很多小文件

标签 scala apache-spark

我有一个相当简单的工作,将日志文件转换为 Parquet 。它正在处理 1.1TB 的数据(分成 64MB - 128MB 的文件 - 我们的块大小是 128MB),大约有 12000 个文件。

工作如下:

 val events = spark.sparkContext
  .textFile(s"$stream/$sourcetype")
  .map(_.split(" \\|\\| ").toList)
  .collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
  .toDF()

df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")

它使用通用模式收集事件,转换为 DataFrame,然后作为 parquet 写出。

我遇到的问题是这会在 HDFS 集群上造成一些 IO 爆炸,因为它试图创建如此多的小文件。

理想情况下,我只想在分区“日期”内创建少数 Parquet 文件。

控制这种情况的最佳方法是什么?是通过使用“coalesce()”吗?

这将如何影响在给定分区中创建的文件数量?它是否取决于我在 Spark 中工作的执行者数量? (目前设置为 100)。

最佳答案

你必须重新分配你的 DataFrame匹配 DataFrameWriter 的分区

尝试这个:

df
.repartition($"date")
.write.mode(SaveMode.Append)
.partitionBy("date")
.parquet(s"$path")

关于scala - Spark dataframe写方法写很多小文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44459355/

相关文章:

Int 和 Long 的 Scala 中值函数

java - org.slf4j.helpers.SubstituteLogger 无法转换为 ch.qos.logback.classic.Logger

scala - 连接一组对象中包含的集合,在 Scala 中的功能

java - Windows 环境上的 Apache Spark : spark. eventLog.dir

azure - Spark 读/写 Azure blob 存储 - IOException : No FileSystem for scheme: wasbs

scala - 如何匹配但不消耗 Akka HTTP 中的路径前缀?

scala - akka 调度器的延迟如何?

scala - 根据包含列表元素的列值创建 bool 标志

java - "Failed to find data source: parquet"用maven做一个fat jar

xml - 使用 Spark/Scala 从 XML 记录中提取元素