scala - 如何合并SPARK数据框创建的文件夹中的所有零件文件并重命名为scala中的文件夹名称

标签 scala apache-spark hdfs apache-spark-sql hadoop2

您好,我有 Spark 数据框的输出,它创建文件夹结构并创建可能的部分文件。 现在我必须合并文件夹内的所有部分文件,并将该文件重命名为文件夹路径名。

这就是我进行分区的方式

df.write.partitionBy("DataPartition","PartitionYear")
  .format("csv")
  .option("nullValue", "")
  .option("header", "true")/
  .option("codec", "gzip")
  .save("hdfs:///user/zeppelin/FinancialLineItem/output")

它创建这样的文件夹结构

hdfs:///user/zeppelin/FinancialLineItem/output/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz
hdfs:///user/zeppelin/FinancialLineItem/output/DataPartition=Japan/PartitionYear=1971/part-00002-87a61115-92c9-4926-a803-b46315e55a08.c001.csv.gz

我必须创建这样的最终文件

hdfs:///user/zeppelin/FinancialLineItem/output/Japan.1971.currenttime.csv.gz

此处没有零件文件,001 和 002 已合并为二。

我的数据大小非常大,300 GB gzip 和 35 GB 压缩,因此合并(1)和重新分区变得非常慢。

我在这里看到了一种解决方案 Write single CSV file using spark-csv但我无法实现它,请帮助我。

重新分区抛出错误

error: value repartition is not a member of org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]
       dfMainOutputFinalWithoutNull.write.repartition("DataPartition","StatementTypeCode")

最佳答案

从 Spark 外部的头节点尝试此操作...

hdfs dfs -getmerge <src> <localdst>

https://hadoop.apache.org/docs/r1.2.1/file_system_shell.html#getmerge

“将源目录和目标文件作为输入,并将 src 中的文件连接到目标本地文件中。可以选择设置 addnl 以在每个文件末尾添加换行符。”

关于scala - 如何合并SPARK数据框创建的文件夹中的所有零件文件并重命名为scala中的文件夹名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46812388/

相关文章:

Scala 2.13 迁移

scala - 是否可以在intellij上调试Camel Scala DSL

Scala Set[_] 与 Set[Any]

scala - Spark collect()/count() 永远不会完成,而 show() 运行速度很快

linux - Apache Spark 使用 Homebrew 软件从 1.5.2 升级到 1.6.0 导致执行期间出现权限被拒绝错误

hadoop - 在 Hadoop 的上下文中,压缩编解码器的可拆分性是什么意思?

hadoop - 关键 : HDFS-HAWQ - Migration to New Hardware

list - 以功能风格将 elem 与下一个连接起来

scala - 尝试将 Spark DF 写入 Hive 表时出现错误 "Invalid call to qualifier on unresolved object"

python - Hadoop Streaming Python 简单示例不起作用