我的数据原则上是一个表,除了其他“数据”之外,它还包含一列 ID
和一列 GROUP_ID
。
在第一步中,我将 CSV 读入 Spark,进行一些处理以准备第二步的数据,并将数据写入 Parquet 。
第二步做了很多 groupBy('GROUP_ID')
和 Window.partitionBy('GROUP_ID').orderBy('ID')
。
现在的目标是——为了避免在第二步中改组——在第一步中有效地加载数据,因为这是一个一次性的。
问题第 1 部分: AFAIK,Spark 在从 parquet 加载时保留分区(这实际上是要进行的任何“优化写入考虑”的基础)- 对吗?
我想出了三种可能:
df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')
df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')
我会设置
n
使得单个 Parquet 文件大约为 100MB。问题第 2 部分: 三个选项在目标方面产生“相同”/相似的结果是否正确(避免在第二步中洗牌)?如果不是,有什么区别?哪个“更好”?
问题第 3 部分: 三个选项中哪一个在步骤 1 中表现更好?
感谢您分享您的知识!
编辑 2017-07-24
经过一些测试(写入和读取 parquet)后,Spark 似乎无法在第二步默认恢复
partitionBy
和 orderBy
信息。分区数(从 df.rdd.getNumPartitions()
获得的似乎由内核数和/或 spark.default.parallelism
(如果设置)决定,而不是由 Parquet 分区数决定。所以 问题 1 的答案将是 _0x104560795 _0x104560795问题 2 和问题 3 无关紧要。所以事实证明 真正的问题 是:有没有办法告诉 Spark,数据已经按列 X 分区并按列 _0x109104040565 排序?
最佳答案
您可能会对 Spark 中的分桶支持感兴趣。
在此处查看详细信息
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html
large.write
.bucketBy(4, "id")
.sortBy("id")
.mode(SaveMode.Overwrite)
.saveAsTable(bucketedTableName)
注意 Spark 2.4 添加了对
bucket pruning
的支持(如 partition pruning
)您正在查看的更直接的功能是 Hive 的分桶排序表
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables
这在 Spark 中尚不可用(请参阅下面的 PS 部分)
还要注意,排序信息不会由 Spark 自动加载,但是由于数据已经排序了..对它的排序操作实际上会快得多,因为没有太多工作要做 - 例如一次传递数据只是为了确认它已经排序。
附注。
Spark 和 Hive 分桶略有不同。
这是在 Spark 中为在 Hive 中创建的分桶表提供兼容性的伞票 -
https://issues.apache.org/jira/browse/SPARK-19256
关于apache-spark - Spark : Most efficient way to sort and partition data to be written as parquet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45224820/