apache-spark - Spark : save ordered data to parquet

标签 apache-spark pyspark sql-order-by parquet partition-by

我有 30TB 的数据按日期和小时划分,每小时分成 300 个文件。我进行了一些数据转换,然后希望对数据进行排序并按排序顺序保存,以便 C++ 程序轻松摄取。我知道当你序列化时,顺序只在文件中是正确的。我希望通过更好地划分数据来避免这种情况。

我想同时按 sessionID 和时间戳排序。我不希望 sessionIDs 在不同的文件之间分割。如果我在 SessionID 上分区,我将拥有太多,所以我对 N 取模以生成 N 个桶,旨在获得 1 个桶的数据大约 100-200MB:

df = df.withColumn("bucket", F.abs(F.col("sessionId")) % F.lit(50))

然后我在排序之前按日期、小时和桶遣返

df = df.repartition(50,"dt","hr","bucket")
df = df.sortWithinPartitions("sessionId","timestamp")
df.write.option("compression","gzip").partitionBy("dt","hr","bucket").parquet(SAVE_PATH)

这会将数据保存到 dt/hr/bucket,每个 bucket 中有 1 个文件,但顺序丢失了。如果我不创建存储桶和重新分区,那么我最终会得到 200 个文件,数据是有序的,但 sessionId 被拆分到多个文件中。

编辑: 问题似乎出在使用 partitionBy("dt","hr","bucket") 保存时,它会随机重新分区数据,因此不再排序。如果我在没有 partitionBy 的情况下保存,那么我得到的正是我所期望的 - N 个文件用于 N 个存储桶/分区和 sessionIds 跨单个文件,所有文件都正确排序。所以我有一个 non-spark hack 手动迭代所有日期 + 小时目录

如果您按列分区、排序,然后使用 partitionBy 写入同一列,那么您希望直接转储已排序的分区,而不是对数据进行一些随机重新洗牌,这似乎是一个错误。

最佳答案

将分区列放在已排序的列列表中可能会成功。

完整描述在这里 - https://stackoverflow.com/a/59161488/3061686

关于apache-spark - Spark : save ordered data to parquet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58881553/

相关文章:

scala - 在 Spark 中对 RDD 中的邻居元素进行操作

json - 在 Apache Spark 中读取多行 JSON

python - Hadoop Spark 1.4.1 - 对多个 CSV 文件进行排序并将排序后的结果保存在 1 个输出文件中

python - key 错误 : SPARK_HOME during SparkConf initialization

postgresql - Postgres 以错误的顺序返回记录

python - 缩放(规范化)SPARK Dataframe 中的一列 - Pyspark

azure - 如何使用pyspark以表格形式打印StringType()的 "dictionary"

python - 按元素乘以稀疏向量

Mysql 按两列排序,主要的和次要的

c# - LINQ 中的 OrderBy 和 Top 性能良好