hadoop - Spark Parquet block 尺寸不均匀

标签 hadoop apache-spark apache-spark-sql hadoop2 parquet

由于 Out of Memory Errors 的原因,我检查了始终出现问题的 Spark 作业的输出 Parquet 文件。 我在 Spark 1.6.0 上使用 Cloudera 5.13.1

我注意到 Parquet 行组大小不均匀。 第一行和最后一行组很大。剩下的真的很小......

parquet-tools RC = row countTS = total size 的缩短输出:

row group 1:                RC:5740100 TS:566954562 OFFSET:4  
row group 2:                RC:33769 TS:2904145 OFFSET:117971092  
row group 3:                RC:31822 TS:2772650 OFFSET:118905225  
row group 4:                RC:29854 TS:2704127 OFFSET:119793188  
row group 5:                RC:28050 TS:2356729 OFFSET:120660675  
row group 6:                RC:26507 TS:2111983 OFFSET:121406541  
row group 7:                RC:25143 TS:1967731 OFFSET:122069351  
row group 8:                RC:23876 TS:1991238 OFFSET:122682160  
row group 9:                RC:22584 TS:2069463 OFFSET:123303246  
row group 10:               RC:21225 TS:1955748 OFFSET:123960700  
row group 11:               RC:19960 TS:1931889 OFFSET:124575333  
row group 12:               RC:18806 TS:1725871 OFFSET:125132862  
row group 13:               RC:17719 TS:1653309 OFFSET:125668057  
row group 14:               RC:1617743 TS:157973949 OFFSET:134217728

这是一个已知的错误吗?如何在 Spark 中设置 Parquet block 大小(行组大小)?

编辑:
Spark 应用程序的作用是:它读取一个大 AVRO 文件,然后按两个分区键分配行(在选择中使用 distribute by <part_keys>),然后使用以下方法为每个分区写入一个 parquet 文件:
ojit_代码

最佳答案

您的 RDD 分区可能不均匀。每个 block 中的行数与 RDD 的不同分区的大小有关。

创建 RDD 时,每个分区包含大致相同的数据量(由于 HashPartitioner )。 Spark 作业处理完毕后,一个分区可能比另一个分区包含更多的数据,可能过滤器转换从一个分区删除的行数多于从另一个分区删除的行数。可以调用 repartition 重新平衡分区在写入 Parquet 文件之前。

编辑:如果问题与分区无关,也许减小行组的大小可能会有所帮助:

sc.hadoopConfiguration.setInt( "parquet.block.size", blockSize ) 

关于hadoop - Spark Parquet block 尺寸不均匀,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49323557/

相关文章:

php - Hadoop起点

arrays - 将列与多行数组合并

python - 如何将其他参数传递给pyspark中用户定义的方法以进行过滤方法?

apache-spark - Spark 行编码器 : empty metadata

apache-spark - Pyspark 根据列值复制行

sql-server - 通过 Spark 将 csv 文件加载到现有的 HIVE 表

hadoop - 无法在 hadoop 中格式化 Namenode

hadoop - 从 "reduce input records"到 "reduce input groups"

scala - 关联规则与频繁模式挖掘

scala - 如何使用spark-submit运行具有多个主要方法的jar?