我有一个数据框记录,每次流程运行时都会更新,这意味着每次流程完成时,我都会有一个数据行,一行4列。
然后,我将使用数据帧写入和 Parquet 格式将其插入配置单元表。
由于一次只有一条记录,因此我在hfds的table文件夹中看到了很多小文件。
当我将数据写入配置单元表时,请让我知道如何减少并将其写入同一文件( Parquet 文件)?
hdfs location: user_id/employe_db/market_table/
from:
part-04498-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04497-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04450-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04449-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
to:
part-03049-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
如何将 Parquet 文件的数量减少到固定为更少的数量,并将新数据加载/写入现有文件中?
part-04499-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
最佳答案
在写入 HDFS
之前,您可以 repartition(1)
,这样每次执行时将创建1个文件。
df.repartition(1).write.parquet("<directory>")
Merging files:
Using Hive:
如果在
user_id/employe_db/market_table/
目录的顶部已经有配置单元表,则通过选择同一表来运行插入覆盖。spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market>")
-仅创建一个文件,然后使用order by
spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market> order by <column>")
您也可以像在Hive中一样运行插入语句。
(要么)
Using Spark:
作为后期提取过程,您可以再次从目录中读取 Parquet 文件,然后再次进行重新分区并写入目录。
df_src=spark.read.parquet("<directory>")
df_src.repartition(<number>).write.mode("overwrite").parquet("<directory>")
NOTE
关于dataframe - pyspark以减少/压缩的小文件数量写入配置单元表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62476181/