dataframe - pyspark以减少/压缩的小文件数量写入配置单元表

标签 dataframe hadoop pyspark hive hdfs

我有一个数据框记录,每次流程运行时都会更新,这意味着每次流程完成时,我都会有一个数据行,一行4列。
然后,我将使用数据帧写入和 Parquet 格式将其插入配置单元表。
由于一次只有一条记录,因此我在hfds的table文件夹中看到了很多小文件。

当我将数据写入配置单元表时,请让我知道如何减少并将其写入同一文件( Parquet 文件)?

hdfs location: user_id/employe_db/market_table/
from:
part-04498-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04497-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04450-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04449-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

to:
part-03049-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

如何将 Parquet 文件的数量减少到固定为更少的数量,并将新数据加载/写入现有文件中?
part-04499-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

最佳答案

在写入 HDFS 之前,您可以 repartition(1) ,这样每次执行时将创建1个文件。

df.repartition(1).write.parquet("<directory>")

Merging files:

Using Hive:

如果在user_id/employe_db/market_table/目录的顶部已经有配置单元表,则通过选择同一表来运行插入覆盖。
spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market>")

-仅创建一个文件,然后使用order by
spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market> order by <column>")

您也可以像在Hive中一样运行插入语句。

(要么)

Using Spark:

作为后期提取过程,您可以再次从目录中读取 Parquet 文件,然后再次进行重新分区并写入目录。
df_src=spark.read.parquet("<directory>")
df_src.repartition(<number>).write.mode("overwrite").parquet("<directory>")

NOTE
  • overwrite首先会删除目录,以防万一作业之间的失败可能会导致数据丢失。
  • 最佳做法是将数据备份到tmp目录中,然后仅覆盖
  • 关于dataframe - pyspark以减少/压缩的小文件数量写入配置单元表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62476181/

    相关文章:

    python - 在 Python 中分箱后返回范围的下限或上限

    apache-spark - pyspark ml 推荐 - Als 推荐

    python - Pandas : default columns names

    python - 重组DataFrame并写入SQL数据库

    java - NoClassDefFoundError : WordCount with hadoop-2. 2.0 在 ubuntu-12.04

    python - hadoop fs -ls 仅存储文件的路径

    hadoop - HDFS IO错误org.apache.hadoop.ipc.RemoteException:服务器IPC版本9无法与客户端版本4 i通信

    apache-spark - AWS Glue - 无法设置 spark.yarn.executor.memoryOverhead

    apache-spark - 是否可以将经过训练的 Spark ML 模型或交叉验证器保存到 postgresql 数据库?

    r - 修改数据框中变量的值