由于 Out of Memory Errors
的原因,我检查了始终出现问题的 Spark 作业的输出 Parquet 文件。
我在 Spark 1.6.0
上使用 Cloudera 5.13.1
我注意到 Parquet 行组大小不均匀。 第一行和最后一行组很大。剩下的真的很小......
parquet-tools RC = row count
、 TS = total size
的缩短输出:
row group 1: RC:5740100 TS:566954562 OFFSET:4
row group 2: RC:33769 TS:2904145 OFFSET:117971092
row group 3: RC:31822 TS:2772650 OFFSET:118905225
row group 4: RC:29854 TS:2704127 OFFSET:119793188
row group 5: RC:28050 TS:2356729 OFFSET:120660675
row group 6: RC:26507 TS:2111983 OFFSET:121406541
row group 7: RC:25143 TS:1967731 OFFSET:122069351
row group 8: RC:23876 TS:1991238 OFFSET:122682160
row group 9: RC:22584 TS:2069463 OFFSET:123303246
row group 10: RC:21225 TS:1955748 OFFSET:123960700
row group 11: RC:19960 TS:1931889 OFFSET:124575333
row group 12: RC:18806 TS:1725871 OFFSET:125132862
row group 13: RC:17719 TS:1653309 OFFSET:125668057
row group 14: RC:1617743 TS:157973949 OFFSET:134217728
这是一个已知的错误吗?如何在 Spark 中设置 Parquet block 大小(行组大小)?
编辑:
Spark 应用程序的作用是:它读取一个大 AVRO 文件,然后按两个分区键分配行(在选择中使用 distribute by <part_keys>
),然后使用以下方法为每个分区写入一个 parquet 文件:
ojit_代码
最佳答案
您的 RDD 分区可能不均匀。每个 block 中的行数与 RDD 的不同分区的大小有关。
创建 RDD 时,每个分区包含大致相同的数据量(由于 HashPartitioner )。 Spark 作业处理完毕后,一个分区可能比另一个分区包含更多的数据,可能过滤器转换从一个分区删除的行数多于从另一个分区删除的行数。可以调用 repartition 重新平衡分区在写入 Parquet 文件之前。
编辑:如果问题与分区无关,也许减小行组的大小可能会有所帮助:
sc.hadoopConfiguration.setInt( "parquet.block.size", blockSize )
关于hadoop - Spark Parquet block 尺寸不均匀,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49323557/